From df0ad4d875c4a9295417399b3e728276fcad2f1c Mon Sep 17 00:00:00 2001 From: kevgliss Date: Mon, 9 May 2016 11:00:16 -0700 Subject: [PATCH] Authorities marshmallow addition (#303) --- lemur/authorities/schemas.py | 79 +++++ lemur/authorities/service.py | 37 +-- lemur/authorities/views.py | 77 ++--- lemur/certificates/schemas.py | 224 +------------ lemur/certificates/service.py | 2 +- lemur/common/validators.py | 120 +++++++ lemur/database.py | 2 + lemur/schemas.py | 122 ++++++- .../authorities/authority/authority.js | 2 +- .../authority/distinguishedName.tpl.html | 10 +- .../authorities/authority/options.tpl.html | 32 +- .../authorities/authority/tracking.tpl.html | 63 +++- .../app/angular/authorities/services.js | 10 +- .../app/angular/authorities/view/view.js | 3 - .../certificate/tracking.tpl.html | 23 +- .../angular/certificates/view/view.tpl.html | 12 +- lemur/tests/conftest.py | 7 + lemur/tests/test_authorities.py | 306 +++++++++--------- lemur/tests/test_certificates.py | 17 +- 19 files changed, 619 insertions(+), 529 deletions(-) create mode 100644 lemur/common/validators.py diff --git a/lemur/authorities/schemas.py b/lemur/authorities/schemas.py index e69de29b..3012d9b2 100644 --- a/lemur/authorities/schemas.py +++ b/lemur/authorities/schemas.py @@ -0,0 +1,79 @@ +""" +.. module: lemur.authorities.schemas + :platform: unix + :copyright: (c) 2015 by Netflix Inc., see AUTHORS for more + :license: Apache, see LICENSE for more details. +.. moduleauthor:: Kevin Glisson +""" +from flask import current_app + +from marshmallow import fields, validates_schema +from marshmallow import validate +from marshmallow.exceptions import ValidationError + +from lemur.schemas import PluginSchema, ExtensionSchema, AssociatedAuthoritySchema, AssociatedRoleSchema +from lemur.common.schema import LemurInputSchema, LemurOutputSchema +from lemur.common import validators + + +class AuthorityInputSchema(LemurInputSchema): + name = fields.String(required=True) + owner = fields.Email(required=True) + description = fields.String() + common_name = fields.String(required=True, validate=validators.sensitive_domain) + + validity_start = fields.DateTime() + validity_end = fields.DateTime() + validity_years = fields.Integer() + + # certificate body fields + organizational_unit = fields.String(missing=lambda: current_app.config.get('LEMUR_DEFAULT_ORGANIZATIONAL_UNIT')) + organization = fields.String(missing=lambda: current_app.config.get('LEMUR_DEFAULT_ORGANIZATION')) + location = fields.String(missing=lambda: current_app.config.get('LEMUR_DEFAULT_LOCATION')) + country = fields.String(missing=lambda: current_app.config.get('LEMUR_DEFAULT_COUNTRY')) + state = fields.String(missing=lambda: current_app.config.get('LEMUR_DEFAULT_STATE')) + + plugin = fields.Nested(PluginSchema) + + # signing related options + type = fields.String(validate=validate.OneOf(['root', 'subca']), missing='root') + authority = fields.Nested(AssociatedAuthoritySchema) + signing_algorithm = fields.String(validate=validate.OneOf(['sha256WithRSA', 'sha1WithRSA']), missing='sha256WithRSA') + key_type = fields.String(validate=validate.OneOf(['RSA2048', 'RSA4096']), missing='RSA2048') + key_name = fields.String() + sensitivity = fields.String(validate=validate.OneOf(['medium', 'high']), missing='medium') + serial_number = fields.Integer() + first_serial = fields.Integer(missing=1) + + extensions = fields.Nested(ExtensionSchema) + + roles = fields.Nested(AssociatedRoleSchema(many=True)) + + @validates_schema + def validate_dates(self, data): + validators.dates(data) + + @validates_schema + def validate_subca(self, data): + if data['type'] == 'subca': + if not data.get('authority'): + raise ValidationError("If generating a subca parent 'authority' must be specified.") + + +class AuthorityOutputSchema(LemurOutputSchema): + id = fields.Integer() + name = fields.String() + owner = fields.Email() + not_before = fields.DateTime() + not_after = fields.DateTime() + plugin_name = fields.String() + body = fields.String() + chain = fields.String() + active = fields.Boolean() + options = fields.Dict() + roles = fields.List(fields.Nested(AssociatedRoleSchema)) + + +authority_input_schema = AuthorityInputSchema() +authority_output_schema = AuthorityOutputSchema() +authorities_output_schema = AuthorityOutputSchema(many=True) diff --git a/lemur/authorities/service.py b/lemur/authorities/service.py index 1d3d2779..3bd0dbf0 100644 --- a/lemur/authorities/service.py +++ b/lemur/authorities/service.py @@ -19,8 +19,6 @@ from lemur.notifications import service as notification_service from lemur.roles.models import Role from lemur.certificates.models import Certificate -from lemur.plugins.base import plugins - def update(authority_id, description=None, owner=None, active=None, roles=None): """ @@ -49,20 +47,20 @@ def create(kwargs): :return: """ - issuer = plugins.get(kwargs.get('pluginName')) + issuer = kwargs['plugin'] kwargs['creator'] = g.current_user.email cert_body, intermediate, issuer_roles = issuer.create_authority(kwargs) cert = Certificate(cert_body, chain=intermediate) - cert.owner = kwargs['ownerEmail'] + cert.owner = kwargs['owner'] - if kwargs['caType'] == 'subca': + if kwargs['type'] == 'subca': cert.description = "This is the ROOT certificate for the {0} sub certificate authority the parent \ - authority is {1}.".format(kwargs.get('caName'), kwargs.get('caParent')) + authority is {1}.".format(kwargs.get('name'), kwargs.get('parent')) else: cert.description = "This is the ROOT certificate for the {0} certificate authority.".format( - kwargs.get('caName') + kwargs.get('name') ) cert.user = g.current_user @@ -79,7 +77,7 @@ def create(kwargs): role = role_service.create( r['name'], password=r['password'], - description="{0} auto generated role".format(kwargs.get('pluginName')), + description="{0} auto generated role".format(kwargs['plugin'].title), username=r['username']) # the user creating the authority should be able to administer it @@ -89,11 +87,11 @@ def create(kwargs): role_objs.append(role) authority = Authority( - kwargs.get('caName'), - kwargs['ownerEmail'], - kwargs['pluginName'], + kwargs.get('name'), + kwargs['owner'], + kwargs['plugin'].slug, cert_body, - description=kwargs['caDescription'], + description=kwargs['description'], chain=intermediate, roles=role_objs ) @@ -102,10 +100,10 @@ def create(kwargs): authority = database.create(authority) # the owning dl or role should have this authority associated with it - owner_role = role_service.get_by_name(kwargs['ownerEmail']) + owner_role = role_service.get_by_name(kwargs['owner']) if not owner_role: - owner_role = role_service.create(kwargs['ownerEmail']) + owner_role = role_service.create(kwargs['owner']) owner_role.authority = authority @@ -170,10 +168,6 @@ def render(args): :return: """ query = database.session_query(Authority) - sort_by = args.pop('sort_by') - sort_dir = args.pop('sort_dir') - page = args.pop('page') - count = args.pop('count') filt = args.pop('filter') if filt: @@ -191,9 +185,4 @@ def render(args): authority_ids.append(role.authority.id) query = query.filter(Authority.id.in_(authority_ids)) - query = database.find_all(query, Authority, args) - - if sort_by and sort_dir: - query = database.sort(query, Authority, sort_by, sort_dir) - - return database.paginate(query, page, count) + return database.sort_and_page(query, Authority, args) diff --git a/lemur/authorities/views.py b/lemur/authorities/views.py index f5e85d75..2f1ca9a7 100644 --- a/lemur/authorities/views.py +++ b/lemur/authorities/views.py @@ -6,31 +6,19 @@ .. moduleauthor:: Kevin Glisson """ from flask import Blueprint, g -from flask.ext.restful import reqparse, fields, Api +from flask.ext.restful import reqparse, Api -from lemur.authorities import service -from lemur.roles import service as role_service -from lemur.certificates import service as certificate_service +from lemur.common.utils import paginated_parser +from lemur.common.schema import validate_schema from lemur.auth.service import AuthenticatedResource - from lemur.auth.permissions import AuthorityPermission -from lemur.common.utils import paginated_parser, marshal_items +from lemur.roles import service as role_service +from lemur.certificates import service as certificate_service +from lemur.authorities import service +from lemur.authorities.schemas import authority_input_schema, authority_output_schema, authorities_output_schema -FIELDS = { - 'name': fields.String, - 'owner': fields.String, - 'description': fields.String, - 'options': fields.Raw, - 'pluginName': fields.String, - 'body': fields.String, - 'chain': fields.String, - 'active': fields.Boolean, - 'notBefore': fields.DateTime(dt_format='iso8601', attribute='not_before'), - 'notAfter': fields.DateTime(dt_format='iso8601', attribute='not_after'), - 'id': fields.Integer, -} mod = Blueprint('authorities', __name__) api = Api(mod) @@ -42,7 +30,7 @@ class AuthoritiesList(AuthenticatedResource): self.reqparse = reqparse.RequestParser() super(AuthoritiesList, self).__init__() - @marshal_items(FIELDS) + @validate_schema(None, authorities_output_schema) def get(self): """ .. http:get:: /authorities @@ -98,8 +86,8 @@ class AuthoritiesList(AuthenticatedResource): args = parser.parse_args() return service.render(args) - @marshal_items(FIELDS) - def post(self): + @validate_schema(authority_input_schema, authority_output_schema) + def post(self, data=None): """ .. http:post:: /authorities @@ -180,25 +168,7 @@ class AuthoritiesList(AuthenticatedResource): :statuscode 403: unauthenticated :statuscode 200: no error """ - self.reqparse.add_argument('caName', type=str, location='json', required=True) - self.reqparse.add_argument('caDescription', type=str, location='json', required=False) - self.reqparse.add_argument('ownerEmail', type=str, location='json', required=True) - self.reqparse.add_argument('caDN', type=dict, location='json', required=False) - self.reqparse.add_argument('validityStart', type=str, location='json', required=False) # TODO validate - self.reqparse.add_argument('validityEnd', type=str, location='json', required=False) # TODO validate - self.reqparse.add_argument('extensions', type=dict, location='json', required=False) - self.reqparse.add_argument('pluginName', type=str, location='json', required=True) - self.reqparse.add_argument('caType', type=str, location='json', required=False) - self.reqparse.add_argument('caParent', type=str, location='json', required=False) - self.reqparse.add_argument('caSigningAlgo', type=str, location='json', required=False) - self.reqparse.add_argument('keyType', type=str, location='json', required=False) - self.reqparse.add_argument('caSensitivity', type=str, location='json', required=False) - self.reqparse.add_argument('caKeyName', type=str, location='json', required=False) - self.reqparse.add_argument('caSerialNumber', type=int, location='json', required=False) - self.reqparse.add_argument('caFirstSerial', type=int, location='json', required=False) - - args = self.reqparse.parse_args() - return service.create(args) + return service.create(data) class Authorities(AuthenticatedResource): @@ -206,7 +176,7 @@ class Authorities(AuthenticatedResource): self.reqparse = reqparse.RequestParser() super(Authorities, self).__init__() - @marshal_items(FIELDS) + @validate_schema(None, authority_output_schema) def get(self, authority_id): """ .. http:get:: /authorities/1 @@ -248,8 +218,8 @@ class Authorities(AuthenticatedResource): """ return service.get(authority_id) - @marshal_items(FIELDS) - def put(self, authority_id): + @validate_schema(authority_input_schema, authority_output_schema) + def put(self, authority_id, data=None): """ .. http:put:: /authorities/1 @@ -295,12 +265,6 @@ class Authorities(AuthenticatedResource): :statuscode 200: no error :statuscode 403: unauthenticated """ - self.reqparse.add_argument('roles', type=list, default=[], location='json') - self.reqparse.add_argument('active', type=str, location='json', required=True) - self.reqparse.add_argument('owner', type=str, location='json', required=True) - self.reqparse.add_argument('description', type=str, location='json', required=True) - args = self.reqparse.parse_args() - authority = service.get(authority_id) role = role_service.get_by_name(authority.owner) @@ -313,7 +277,7 @@ class Authorities(AuthenticatedResource): # we want to make sure that we cannot add roles that we are not members of if not g.current_user.is_admin: - role_ids = set([r['id'] for r in args['roles']]) + role_ids = set([r['id'] for r in data['roles']]) user_role_ids = set([r.id for r in g.current_user.roles]) if not role_ids.issubset(user_role_ids): @@ -322,10 +286,10 @@ class Authorities(AuthenticatedResource): if permission.can(): return service.update( authority_id, - owner=args['owner'], - description=args['description'], - active=args['active'], - roles=args['roles'] + owner=data['owner'], + description=data['description'], + active=data['active'], + roles=data['roles'] ) return dict(message="You are not authorized to update this authority"), 403 @@ -333,10 +297,9 @@ class Authorities(AuthenticatedResource): class CertificateAuthority(AuthenticatedResource): def __init__(self): - self.reqparse = reqparse.RequestParser() super(CertificateAuthority, self).__init__() - @marshal_items(FIELDS) + @validate_schema(None, authority_output_schema) def get(self, certificate_id): """ .. http:get:: /certificates/1/authority diff --git a/lemur/certificates/schemas.py b/lemur/certificates/schemas.py index b83612d2..0a70f758 100644 --- a/lemur/certificates/schemas.py +++ b/lemur/certificates/schemas.py @@ -7,197 +7,20 @@ """ from flask import current_app -import arrow - -from marshmallow import fields, validates_schema, pre_load, post_dump +from marshmallow import fields, validates_schema from marshmallow.exceptions import ValidationError -from cryptography import x509 -from cryptography.hazmat.backends import default_backend -from cryptography.hazmat.primitives import serialization - -from lemur.auth.permissions import SensitiveDomainPermission from lemur.schemas import AssociatedAuthoritySchema, AssociatedDestinationSchema, AssociatedCertificateSchema, \ - AssociatedNotificationSchema, PluginSchema -from lemur.common.schema import LemurInputSchema, LemurOutputSchema, LemurSchema - -from lemur.domains import service as domain_service - - -def validate_public_certificate(body): - """ - Determines if specified string is valid public certificate. - - :param body: - :return: - """ - try: - x509.load_pem_x509_certificate(bytes(body), default_backend()) - except Exception: - raise ValidationError('Public certificate presented is not valid.') - - -def validate_private_key(key): - """ - User to validate that a given string is a RSA private key - - :param key: - :return: :raise ValueError: - """ - try: - serialization.load_pem_private_key(bytes(key), None, backend=default_backend()) - except Exception: - raise ValidationError('Private key presented is not valid.') - - -def validate_domain(domain): - """ - Determines if domain has been marked as sensitive. - :param domain: - :return: - """ - domains = domain_service.get_by_name(domain) - for domain in domains: - # we only care about non-admins - if not SensitiveDomainPermission().can(): - if domain.sensitive: - raise ValidationError( - 'Domain {0} has been marked as sensitive, contact and administrator \ - to issue the certificate.'.format(domain)) - - -def validate_oid_type(oid_type): - """ - Determines if the specified oid type is valid. - :param oid_type: - :return: - """ - valid_types = ['b64asn1', 'string', 'ia5string'] - if oid_type.lower() not in [o_type.lower() for o_type in valid_types]: - raise ValidationError('Invalid Oid Type: {0} choose from {1}'.format(oid_type, ",".join(valid_types))) - - -def validate_sub_alt_type(alt_type): - """ - Determines if the specified subject alternate type is valid. - :param alt_type: - :return: - """ - valid_types = ['DNSName', 'IPAddress', 'uniFormResourceIdentifier', 'directoryName', 'rfc822Name', 'registrationID', - 'otherName', 'x400Address', 'EDIPartyName'] - if alt_type.lower() not in [a_type.lower() for a_type in valid_types]: - raise ValidationError('Invalid SubAltName Type: {0} choose from {1}'.format(type, ",".join(valid_types))) - - -def validate_csr(data): - """ - Determines if the CSR is valid. - :param data: - :return: - """ - try: - x509.load_pem_x509_csr(bytes(data), default_backend()) - except Exception: - raise ValidationError('CSR presented is not valid.') - - -class BaseExtensionSchema(LemurSchema): - @pre_load(pass_many=True) - def preprocess(self, data, many): - return self.under(data, many=many) - - @post_dump(pass_many=True) - def post_process(self, data, many): - if data: - data = self.camel(data, many=many) - return data - - -class BasicConstraintsSchema(BaseExtensionSchema): - pass - - -class AuthorityIdentifierSchema(BaseExtensionSchema): - use_authority_cert = fields.Boolean() - - -class AuthorityKeyIdentifierSchema(BaseExtensionSchema): - use_key_identifier = fields.Boolean() - - -class CertificateInfoAccessSchema(BaseExtensionSchema): - include_aia = fields.Boolean() - - @post_dump - def handle_keys(self, data): - return {'includeAIA': data['include_aia']} - - -class KeyUsageSchema(BaseExtensionSchema): - use_crl_sign = fields.Boolean() - use_data_encipherment = fields.Boolean() - use_decipher_only = fields.Boolean() - use_encipher_only = fields.Boolean() - use_key_encipherment = fields.Boolean() - use_digital_signature = fields.Boolean() - use_non_repudiation = fields.Boolean() - - -class ExtendedKeyUsageSchema(BaseExtensionSchema): - use_server_authentication = fields.Boolean() - use_client_authentication = fields.Boolean() - use_eap_over_lan = fields.Boolean() - use_eap_over_ppp = fields.Boolean() - use_ocsp_signing = fields.Boolean() - use_smart_card_authentication = fields.Boolean() - use_timestamping = fields.Boolean() - - -class SubjectKeyIdentifierSchema(BaseExtensionSchema): - include_ski = fields.Boolean() - - @post_dump - def handle_keys(self, data): - return {'includeSKI': data['include_ski']} - - -class SubAltNameSchema(BaseExtensionSchema): - name_type = fields.String(validate=validate_sub_alt_type) - value = fields.String() - - @validates_schema - def check_sensitive(self, data): - if data['name_type'] == 'DNSName': - validate_domain(data['value']) - - -class SubAltNamesSchema(BaseExtensionSchema): - names = fields.Nested(SubAltNameSchema, many=True) - - -class CustomOIDSchema(BaseExtensionSchema): - oid = fields.String() - oid_type = fields.String(validate=validate_oid_type) - value = fields.String() - - -class ExtensionSchema(BaseExtensionSchema): - basic_constraints = fields.Nested(BasicConstraintsSchema) - key_usage = fields.Nested(KeyUsageSchema) - extended_key_usage = fields.Nested(ExtendedKeyUsageSchema) - subject_key_identifier = fields.Nested(SubjectKeyIdentifierSchema) - sub_alt_names = fields.Nested(SubAltNamesSchema) - authority_identifier = fields.Nested(AuthorityIdentifierSchema) - authority_key_identifier = fields.Nested(AuthorityKeyIdentifierSchema) - certificate_info_access = fields.Nested(CertificateInfoAccessSchema) - custom = fields.List(fields.Nested(CustomOIDSchema)) + AssociatedNotificationSchema, PluginSchema, ExtensionSchema +from lemur.common.schema import LemurInputSchema, LemurOutputSchema +from lemur.common import validators class CertificateInputSchema(LemurInputSchema): name = fields.String() owner = fields.Email(required=True) description = fields.String() - common_name = fields.String(required=True, validate=validate_domain) + common_name = fields.String(required=True, validate=validators.sensitive_domain) authority = fields.Nested(AssociatedAuthoritySchema, required=True) validity_start = fields.DateTime() @@ -208,7 +31,7 @@ class CertificateInputSchema(LemurInputSchema): notifications = fields.Nested(AssociatedNotificationSchema, missing=[], many=True) replacements = fields.Nested(AssociatedCertificateSchema, missing=[], many=True) - csr = fields.String(validate=validate_csr) + csr = fields.String(validate=validators.csr) # certificate body fields organizational_unit = fields.String(missing=lambda: current_app.config.get('LEMUR_DEFAULT_ORGANIZATIONAL_UNIT')) @@ -221,34 +44,7 @@ class CertificateInputSchema(LemurInputSchema): @validates_schema def validate_dates(self, data): - if not data.get('validity_start') and data.get('validity_end'): - raise ValidationError('If validity start is specified so must validity end.') - - if not data.get('validity_end') and data.get('validity_start'): - raise ValidationError('If validity end is specified so must validity start.') - - if data.get('validity_end') and data.get('validity_years'): - raise ValidationError('Cannot specify both validity end and validity years.') - - if data.get('validity_start') and data.get('validity_end'): - if not data['validity_start'] < data['validity_end']: - raise ValidationError('Validity start must be before validity end.') - - if data.get('validity_start').replace(tzinfo=None) < data['authority'].not_before: - raise ValidationError('Validity start must not be before {0}'.format(data['authority'].not_before)) - - if data.get('validity_end').replace(tzinfo=None) > data['authority'].not_after: - raise ValidationError('Validity end must not be after {0}'.format(data['authority'].not_after)) - - if data.get('validity_years'): - now = arrow.utcnow() - end = now.replace(years=+data['validity_years']) - - if now.naive < data['authority'].not_before: - raise ValidationError('Validity start must not be before {0}'.format(data['authority'].not_before)) - - if end.naive > data['authority'].not_after: - raise ValidationError('Validity end must not be after {0}'.format(data['authority'].not_after)) + validators.dates(data) class CertificateOutputSchema(LemurOutputSchema): @@ -276,9 +72,9 @@ class CertificateUploadInputSchema(LemurInputSchema): description = fields.String() active = fields.Boolean(missing=True) - private_key = fields.String(validate=validate_private_key) - public_cert = fields.String(required=True, validate=validate_public_certificate) - chain = fields.String(validate=validate_public_certificate) + private_key = fields.String(validate=validators.private_key) + public_cert = fields.String(required=True, validate=validators.public_certificate) + chain = fields.String(validate=validators.public_certificate) # TODO this could be multiple certificates destinations = fields.Nested(AssociatedDestinationSchema, missing=[], many=True) notifications = fields.Nested(AssociatedNotificationSchema, missing=[], many=True) diff --git a/lemur/certificates/service.py b/lemur/certificates/service.py index 00669b34..ba29b10b 100644 --- a/lemur/certificates/service.py +++ b/lemur/certificates/service.py @@ -253,7 +253,7 @@ def create(**kwargs): cert.name = kwargs['name'] database.create(cert) - cert.description = kwargs['description'] + cert.description = kwargs.get('description') g.user.certificates.append(cert) database.update(g.user) diff --git a/lemur/common/validators.py b/lemur/common/validators.py new file mode 100644 index 00000000..99ae5c19 --- /dev/null +++ b/lemur/common/validators.py @@ -0,0 +1,120 @@ + +import arrow +from marshmallow.exceptions import ValidationError + +from cryptography import x509 +from cryptography.hazmat.backends import default_backend +from cryptography.hazmat.primitives import serialization + +from lemur.domains import service as domain_service +from lemur.auth.permissions import SensitiveDomainPermission + + +def public_certificate(body): + """ + Determines if specified string is valid public certificate. + + :param body: + :return: + """ + try: + x509.load_pem_x509_certificate(bytes(body), default_backend()) + except Exception: + raise ValidationError('Public certificate presented is not valid.') + + +def private_key(key): + """ + User to validate that a given string is a RSA private key + + :param key: + :return: :raise ValueError: + """ + try: + serialization.load_pem_private_key(bytes(key), None, backend=default_backend()) + except Exception: + raise ValidationError('Private key presented is not valid.') + + +def sensitive_domain(domain): + """ + Determines if domain has been marked as sensitive. + :param domain: + :return: + """ + domains = domain_service.get_by_name(domain) + for domain in domains: + # we only care about non-admins + if not SensitiveDomainPermission().can(): + if domain.sensitive: + raise ValidationError( + 'Domain {0} has been marked as sensitive, contact and administrator \ + to issue the certificate.'.format(domain)) + + +def oid_type(oid_type): + """ + Determines if the specified oid type is valid. + :param oid_type: + :return: + """ + valid_types = ['b64asn1', 'string', 'ia5string'] + if oid_type.lower() not in [o_type.lower() for o_type in valid_types]: + raise ValidationError('Invalid Oid Type: {0} choose from {1}'.format(oid_type, ",".join(valid_types))) + + +def sub_alt_type(alt_type): + """ + Determines if the specified subject alternate type is valid. + :param alt_type: + :return: + """ + valid_types = ['DNSName', 'IPAddress', 'uniFormResourceIdentifier', 'directoryName', 'rfc822Name', 'registrationID', + 'otherName', 'x400Address', 'EDIPartyName'] + if alt_type.lower() not in [a_type.lower() for a_type in valid_types]: + raise ValidationError('Invalid SubAltName Type: {0} choose from {1}'.format(type, ",".join(valid_types))) + + +def csr(data): + """ + Determines if the CSR is valid. + :param data: + :return: + """ + try: + x509.load_pem_x509_csr(bytes(data), default_backend()) + except Exception: + raise ValidationError('CSR presented is not valid.') + + +def dates(data): + if not data.get('validity_start') and data.get('validity_end'): + raise ValidationError('If validity start is specified so must validity end.') + + if not data.get('validity_end') and data.get('validity_start'): + raise ValidationError('If validity end is specified so must validity start.') + + if data.get('validity_end') and data.get('validity_years'): + raise ValidationError('Cannot specify both validity end and validity years.') + + if data.get('validity_start') and data.get('validity_end'): + if not data['validity_start'] < data['validity_end']: + raise ValidationError('Validity start must be before validity end.') + + if data.get('authority'): + if data.get('validity_start').replace(tzinfo=None) < data['authority'].not_before: + raise ValidationError('Validity start must not be before {0}'.format(data['authority'].not_before)) + + if data.get('validity_end').replace(tzinfo=None) > data['authority'].not_after: + raise ValidationError('Validity end must not be after {0}'.format(data['authority'].not_after)) + + if data.get('validity_years'): + now = arrow.utcnow() + end = now.replace(years=+data['validity_years']) + + if data.get('authority'): + if now.naive < data['authority'].not_before: + raise ValidationError('Validity start must not be before {0}'.format(data['authority'].not_before)) + + if end.naive > data['authority'].not_after: + raise ValidationError('Validity end must not be after {0}'.format(data['authority'].not_after)) diff --git a/lemur/database.py b/lemur/database.py index f654d9f6..5caa8160 100644 --- a/lemur/database.py +++ b/lemur/database.py @@ -289,5 +289,7 @@ def sort_and_page(query, model, args): total = query.count() + # offset calculated at zero + page -= 1 items = query.offset(count * page).limit(count).all() return dict(items=items, total=total) diff --git a/lemur/schemas.py b/lemur/schemas.py index 9b05d138..6cf222da 100644 --- a/lemur/schemas.py +++ b/lemur/schemas.py @@ -7,26 +7,48 @@ .. moduleauthor:: Kevin Glisson """ -from marshmallow import fields, post_load +from marshmallow import fields, post_load, pre_load, post_dump, validates_schema + +from lemur.roles.models import Role from lemur.authorities.models import Authority from lemur.destinations.models import Destination from lemur.certificates.models import Certificate from lemur.notifications.models import Notification -from lemur.common.schema import LemurInputSchema + +from lemur.common import validators +from lemur.common.schema import LemurSchema, LemurInputSchema from lemur.plugins import plugins class AssociatedAuthoritySchema(LemurInputSchema): - id = fields.Int(required=True) + id = fields.Int() + name = fields.String() @post_load def get_object(self, data, many=False): - return Authority.query.filter(Authority.id == data['id']).one() + if data.get('id'): + return Authority.query.filter(Authority.id == data['id']).one() + elif data.get('name'): + return Authority.query.filter(Authority.name == data['name']).one() + + +class AssociatedRoleSchema(LemurInputSchema): + id = fields.Int(required=True) + name = fields.String() + + @post_load + def get_object(self, data, many=False): + if many: + ids = [d['id'] for d in data] + return Role.query.filter(Role.id.in_(ids)).all() + else: + return Role.query.filter(Role.id == data['id']).one() class AssociatedDestinationSchema(LemurInputSchema): id = fields.Int(required=True) + name = fields.String() @post_load def get_object(self, data, many=False): @@ -71,3 +93,95 @@ class PluginSchema(LemurInputSchema): return [plugins.get(plugin['slug']) for plugin in data] else: return plugins.get(data['slug']) + + +class BaseExtensionSchema(LemurSchema): + @pre_load(pass_many=True) + def preprocess(self, data, many): + return self.under(data, many=many) + + @post_dump(pass_many=True) + def post_process(self, data, many): + if data: + data = self.camel(data, many=many) + return data + + +class BasicConstraintsSchema(BaseExtensionSchema): + pass + + +class AuthorityIdentifierSchema(BaseExtensionSchema): + use_authority_cert = fields.Boolean() + + +class AuthorityKeyIdentifierSchema(BaseExtensionSchema): + use_key_identifier = fields.Boolean() + + +class CertificateInfoAccessSchema(BaseExtensionSchema): + include_aia = fields.Boolean() + + @post_dump + def handle_keys(self, data): + return {'includeAIA': data['include_aia']} + + +class KeyUsageSchema(BaseExtensionSchema): + use_crl_sign = fields.Boolean() + use_data_encipherment = fields.Boolean() + use_decipher_only = fields.Boolean() + use_encipher_only = fields.Boolean() + use_key_encipherment = fields.Boolean() + use_digital_signature = fields.Boolean() + use_non_repudiation = fields.Boolean() + + +class ExtendedKeyUsageSchema(BaseExtensionSchema): + use_server_authentication = fields.Boolean() + use_client_authentication = fields.Boolean() + use_eap_over_lan = fields.Boolean() + use_eap_over_ppp = fields.Boolean() + use_ocsp_signing = fields.Boolean() + use_smart_card_authentication = fields.Boolean() + use_timestamping = fields.Boolean() + + +class SubjectKeyIdentifierSchema(BaseExtensionSchema): + include_ski = fields.Boolean() + + @post_dump + def handle_keys(self, data): + return {'includeSKI': data['include_ski']} + + +class SubAltNameSchema(BaseExtensionSchema): + name_type = fields.String(validate=validators.sub_alt_type) + value = fields.String() + + @validates_schema + def check_sensitive(self, data): + if data['name_type'] == 'DNSName': + validators.sensitive_domain(data['value']) + + +class SubAltNamesSchema(BaseExtensionSchema): + names = fields.Nested(SubAltNameSchema, many=True) + + +class CustomOIDSchema(BaseExtensionSchema): + oid = fields.String() + oid_type = fields.String(validate=validators.oid_type) + value = fields.String() + + +class ExtensionSchema(BaseExtensionSchema): + basic_constraints = fields.Nested(BasicConstraintsSchema) + key_usage = fields.Nested(KeyUsageSchema) + extended_key_usage = fields.Nested(ExtendedKeyUsageSchema) + subject_key_identifier = fields.Nested(SubjectKeyIdentifierSchema) + sub_alt_names = fields.Nested(SubAltNamesSchema) + authority_identifier = fields.Nested(AuthorityIdentifierSchema) + authority_key_identifier = fields.Nested(AuthorityKeyIdentifierSchema) + certificate_info_access = fields.Nested(CertificateInfoAccessSchema) + custom = fields.List(fields.Nested(CustomOIDSchema)) diff --git a/lemur/static/app/angular/authorities/authority/authority.js b/lemur/static/app/angular/authorities/authority/authority.js index 688a33a4..93b0aa6d 100644 --- a/lemur/static/app/angular/authorities/authority/authority.js +++ b/lemur/static/app/angular/authorities/authority/authority.js @@ -4,7 +4,6 @@ angular.module('lemur') .controller('AuthorityEditController', function ($scope, $modalInstance, AuthorityApi, AuthorityService, RoleService, toaster, editId){ AuthorityApi.get(editId).then(function (authority) { - AuthorityService.getRoles(authority); $scope.authority = authority; }); @@ -69,6 +68,7 @@ angular.module('lemur') PluginService.getByType('issuer').then(function (plugins) { $scope.plugins = plugins; + $scope.authority.plugin = plugins[0]; }); $scope.roleService = RoleService; diff --git a/lemur/static/app/angular/authorities/authority/distinguishedName.tpl.html b/lemur/static/app/angular/authorities/authority/distinguishedName.tpl.html index 302f01a4..c5ce6ff3 100644 --- a/lemur/static/app/angular/authorities/authority/distinguishedName.tpl.html +++ b/lemur/static/app/angular/authorities/authority/distinguishedName.tpl.html @@ -6,7 +6,7 @@ Country
- +

You must enter a country

@@ -16,7 +16,7 @@ State
- +

You must enter a state

@@ -26,7 +26,7 @@ Location
- +

You must enter a location

@@ -36,7 +36,7 @@ Organization
- +

You must enter a organization

@@ -46,7 +46,7 @@ Organizational Unit
- +

You must enter a organizational unit

diff --git a/lemur/static/app/angular/authorities/authority/options.tpl.html b/lemur/static/app/angular/authorities/authority/options.tpl.html index 26c6fdc6..57fc29e6 100644 --- a/lemur/static/app/angular/authorities/authority/options.tpl.html +++ b/lemur/static/app/angular/authorities/authority/options.tpl.html @@ -1,30 +1,10 @@
-
- -
- -
-
-
- -
- -
-
- +
@@ -32,7 +12,7 @@ Sensitivity
- +
@@ -43,7 +23,7 @@
-
+
@@ -56,7 +36,7 @@ Serial Number
- +
@@ -64,7 +44,7 @@ First Serial Number
- +
@@ -72,7 +52,7 @@ Plugin
- +
diff --git a/lemur/static/app/angular/authorities/authority/tracking.tpl.html b/lemur/static/app/angular/authorities/authority/tracking.tpl.html index b019bcf9..8bf2aac3 100644 --- a/lemur/static/app/angular/authorities/authority/tracking.tpl.html +++ b/lemur/static/app/angular/authorities/authority/tracking.tpl.html @@ -1,33 +1,33 @@
+ ng-class="{'has-error': trackingForm.name.$invalid, 'has-success': !trackingForm.name.$invalid&&trackingForm.name.$dirty}">
- -

You must enter a valid authority name, spaces are not allowed

+ +

You must enter a valid authority name, spaces are not allowed

+ ng-class="{'has-error': trackingForm.owner.$invalid, 'has-success': !trackingForm.$invalid&&trackingForm.owner.$dirty}">
- -

You must enter an Certificate Authority owner

+ +

You must enter an Certificate Authority owner

+ ng-class="{'has-error': trackingForm.description.$invalid, 'has-success': !trackingForm.$invalid&&trackingForm.description.$dirty}">
- -

You must give a short description about this authority will be used for

+ +

You must give a short description about this authority will be used for

- +

You must enter a common name and it must be less than 64 characters in length

+
+ +
+ +
+
+
+ +
+ +
+
-
+
+ +
+ + - or - + +
- +

A start date is required!

@@ -56,11 +88,12 @@
- -
+ +
- +

A end date is required!

diff --git a/lemur/static/app/angular/authorities/services.js b/lemur/static/app/angular/authorities/services.js index 0566454d..e7da9c85 100644 --- a/lemur/static/app/angular/authorities/services.js +++ b/lemur/static/app/angular/authorities/services.js @@ -94,11 +94,11 @@ angular.module('lemur') AuthorityService.getDefaults = function (authority) { return DefaultService.get().then(function (defaults) { - authority.caDN.country = defaults.country; - authority.caDN.state = defaults.state; - authority.caDN.location = defaults.location; - authority.caDN.organization = defaults.organization; - authority.caDN.organizationalUnit = defaults.organizationalUnit; + authority.country = defaults.country; + authority.state = defaults.state; + authority.location = defaults.location; + authority.organization = defaults.organization; + authority.organizationalUnit = defaults.organizationalUnit; }); }; diff --git a/lemur/static/app/angular/authorities/view/view.js b/lemur/static/app/angular/authorities/view/view.js index f7dbeabe..a3544bc3 100644 --- a/lemur/static/app/angular/authorities/view/view.js +++ b/lemur/static/app/angular/authorities/view/view.js @@ -29,9 +29,6 @@ angular.module('lemur') total: 0, // length of data getData: function ($defer, params) { AuthorityApi.getList(params.url()).then(function (data) { - _.each(data, function(authority) { - AuthorityService.getRoles(authority); - }); params.total(data.total); $defer.resolve(data); }); diff --git a/lemur/static/app/angular/certificates/certificate/tracking.tpl.html b/lemur/static/app/angular/certificates/certificate/tracking.tpl.html index 6477b49f..22d9d845 100644 --- a/lemur/static/app/angular/certificates/certificate/tracking.tpl.html +++ b/lemur/static/app/angular/certificates/certificate/tracking.tpl.html @@ -104,11 +104,11 @@ - or - -
-
- +
+
+ @@ -117,13 +117,12 @@
- -
-
-
- +
+
+
+ diff --git a/lemur/static/app/angular/certificates/view/view.tpl.html b/lemur/static/app/angular/certificates/view/view.tpl.html index 9683e813..d2989388 100644 --- a/lemur/static/app/angular/certificates/view/view.tpl.html +++ b/lemur/static/app/angular/certificates/view/view.tpl.html @@ -53,7 +53,7 @@
- + @@ -80,8 +80,8 @@
  • San - - + +
  • @@ -101,9 +101,9 @@ class="list-group-item"> Validity - Unknown - Revoked - Valid + Unknown + Revoked + Valid
  • diff --git a/lemur/tests/conftest.py b/lemur/tests/conftest.py index f7fd6b28..d437dc73 100644 --- a/lemur/tests/conftest.py +++ b/lemur/tests/conftest.py @@ -103,6 +103,13 @@ def certificate(session): return c +@pytest.fixture +def role(session): + r = RoleFactory() + session.commit() + return r + + @pytest.yield_fixture(scope="function") def logged_in_user(app, user): with app.test_request_context(): diff --git a/lemur/tests/test_authorities.py b/lemur/tests/test_authorities.py index 8e29dbd2..78c1fbb2 100644 --- a/lemur/tests/test_authorities.py +++ b/lemur/tests/test_authorities.py @@ -1,150 +1,160 @@ + +import pytest from lemur.authorities.views import * # noqa -# def test_crud(session): -# role = create('role1') -# assert role.id > 0 -# -# role = update(role.id, 'role_new', None, []) -# assert role.name == 'role_new' -# delete(role.id) -# assert get(role.id) == None - - -def test_authority_get(client): - assert client.get(api.url_for(Authorities, authority_id=1)).status_code == 401 - - -def test_authority_post(client): - assert client.post(api.url_for(Authorities, authority_id=1), data={}).status_code == 405 - - -def test_authority_put(client): - assert client.put(api.url_for(Authorities, authority_id=1), data={}).status_code == 401 - - -def test_authority_delete(client): - assert client.delete(api.url_for(Authorities, authority_id=1)).status_code == 405 - - -def test_authority_patch(client): - assert client.patch(api.url_for(Authorities, authority_id=1), data={}).status_code == 405 - - -def test_authorities_get(client): - assert client.get(api.url_for(AuthoritiesList)).status_code == 401 - - -def test_authorities_post(client): - assert client.post(api.url_for(AuthoritiesList), data={}).status_code == 401 - - -def test_authorities_put(client): - assert client.put(api.url_for(AuthoritiesList), data={}).status_code == 405 - - -def test_authorities_delete(client): - assert client.delete(api.url_for(AuthoritiesList)).status_code == 405 - - -def test_authorities_patch(client): - assert client.patch(api.url_for(AuthoritiesList), data={}).status_code == 405 - - -def test_certificate_authorities_get(client): - assert client.get(api.url_for(AuthoritiesList)).status_code == 401 - - -def test_certificate_authorities_post(client): - assert client.post(api.url_for(AuthoritiesList), data={}).status_code == 401 - - -def test_certificate_authorities_put(client): - assert client.put(api.url_for(AuthoritiesList), data={}).status_code == 405 - - -def test_certificate_authorities_delete(client): - assert client.delete(api.url_for(AuthoritiesList)).status_code == 405 - - -def test_certificate_authorities_patch(client): - assert client.patch(api.url_for(AuthoritiesList), data={}).status_code == 405 - - -VALID_USER_HEADER_TOKEN = { - 'Authorization': 'Basic ' + 'eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJpYXQiOjE0MzUyMzMzNjksInN1YiI6MSwiZXhwIjoxNTIxNTQ2OTY5fQ.1qCi0Ip7mzKbjNh0tVd3_eJOrae3rNa_9MCVdA4WtQI'} - - -def test_auth_authority_get(client): - assert client.get(api.url_for(Authorities, authority_id=1), headers=VALID_USER_HEADER_TOKEN).status_code == 200 - - -def test_auth_authority_post_(client): - assert client.post(api.url_for(Authorities, authority_id=1), data={}, headers=VALID_USER_HEADER_TOKEN).status_code == 405 - - -def test_auth_authority_put(client): - assert client.put(api.url_for(Authorities, authority_id=1), data={}, headers=VALID_USER_HEADER_TOKEN).status_code == 400 - - -def test_auth_authority_delete(client): - assert client.delete(api.url_for(Authorities, authority_id=1), headers=VALID_USER_HEADER_TOKEN).status_code == 405 - - -def test_auth_authority_patch(client): - assert client.patch(api.url_for(Authorities, authority_id=1), data={}, headers=VALID_USER_HEADER_TOKEN).status_code == 405 - - -def test_auth_authorities_get(client): - assert client.get(api.url_for(AuthoritiesList), headers=VALID_USER_HEADER_TOKEN).status_code == 200 - - -def test_auth_authorities_post(client): - assert client.post(api.url_for(AuthoritiesList), data={}, headers=VALID_USER_HEADER_TOKEN).status_code == 400 - - -def test_auth_certificates_authorities_get(client): - assert client.get(api.url_for(CertificateAuthority, certificate_id=1), headers=VALID_USER_HEADER_TOKEN).status_code == 404 - - -VALID_ADMIN_HEADER_TOKEN = { - 'Authorization': 'Basic ' + 'eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJpYXQiOjE0MzUyNTAyMTgsInN1YiI6MiwiZXhwIjoxNTIxNTYzODE4fQ.6mbq4-Ro6K5MmuNiTJBB153RDhlM5LGJBjI7GBKkfqA'} - - -def test_admin_authority_get(client): - assert client.get(api.url_for(Authorities, authority_id=1), headers=VALID_ADMIN_HEADER_TOKEN).status_code == 200 - - -def test_admin_authority_post(client): - assert client.post(api.url_for(Authorities, authority_id=1), data={}, headers=VALID_ADMIN_HEADER_TOKEN).status_code == 405 - - -def test_admin_authority_put(client): - assert client.put(api.url_for(Authorities, authority_id=1), data={}, headers=VALID_ADMIN_HEADER_TOKEN).status_code == 400 - - -def test_admin_authority_delete(client): - assert client.delete(api.url_for(Authorities, authority_id=1), headers=VALID_ADMIN_HEADER_TOKEN).status_code == 405 - - -def test_admin_authority_patch(client): - assert client.patch(api.url_for(Authorities, authority_id=1), data={}, headers=VALID_ADMIN_HEADER_TOKEN).status_code == 405 - - -def test_admin_authorities_get(client): - assert client.get(api.url_for(AuthoritiesList), headers=VALID_ADMIN_HEADER_TOKEN).status_code == 200 - - -def test_admin_authorities_post(client): - assert client.post(api.url_for(AuthoritiesList), data={}, headers=VALID_ADMIN_HEADER_TOKEN).status_code == 400 - - -def test_admin_authorities_put(client): - assert client.put(api.url_for(AuthoritiesList), data={}, headers=VALID_ADMIN_HEADER_TOKEN).status_code == 405 - - -def test_admin_authorities_delete(client): - assert client.delete(api.url_for(AuthoritiesList), headers=VALID_ADMIN_HEADER_TOKEN).status_code == 405 - - -def test_admin_certificate_authorities_get(client): - assert client.get(api.url_for(CertificateAuthority, certificate_id=1), headers=VALID_ADMIN_HEADER_TOKEN).status_code == 404 +from .vectors import VALID_ADMIN_HEADER_TOKEN, VALID_USER_HEADER_TOKEN + + +def test_authority_input_schema(client, role): + from lemur.authorities.schemas import AuthorityInputSchema + + input_data = { + 'name': 'Example Authority', + 'owner': 'jim@example.com', + 'description': 'An example authority.', + 'commonName': 'AnExampleAuthority', + 'pluginName': {'slug': 'verisign-issuer'}, + 'type': 'root', + 'signingAlgorithm': 'sha256WithRSA', + 'keyType': 'RSA2048', + 'sensitivity': 'medium' + } + + data, errors = AuthorityInputSchema().load(input_data) + + assert not errors + + +@pytest.mark.parametrize("token,status", [ + (VALID_USER_HEADER_TOKEN, 404), + (VALID_ADMIN_HEADER_TOKEN, 404), + ('', 401) +]) +def test_authority_get(client, token, status): + assert client.get(api.url_for(Authorities, authority_id=1), headers=token).status_code == status + + +@pytest.mark.parametrize("token,status", [ + (VALID_USER_HEADER_TOKEN, 405), + (VALID_ADMIN_HEADER_TOKEN, 405), + ('', 405) +]) +def test_authority_post(client, token, status): + assert client.post(api.url_for(Authorities, authority_id=1), data={}, headers=token).status_code == status + + +@pytest.mark.parametrize("token,status", [ + (VALID_USER_HEADER_TOKEN, 400), + (VALID_ADMIN_HEADER_TOKEN, 400), + ('', 401) +]) +def test_authority_put(client, token, status): + assert client.put(api.url_for(Authorities, authority_id=1), data={}, headers=token).status_code == status + + +@pytest.mark.parametrize("token,status", [ + (VALID_USER_HEADER_TOKEN, 405), + (VALID_ADMIN_HEADER_TOKEN, 405), + ('', 405) +]) +def test_authority_delete(client, token, status): + assert client.delete(api.url_for(Authorities, authority_id=1), headers=token).status_code == status + + +@pytest.mark.parametrize("token,status", [ + (VALID_USER_HEADER_TOKEN, 405), + (VALID_ADMIN_HEADER_TOKEN, 405), + ('', 405) +]) +def test_authority_patch(client, token, status): + assert client.patch(api.url_for(Authorities, authority_id=1), data={}, headers=token).status_code == status + + +@pytest.mark.parametrize("token,status", [ + (VALID_USER_HEADER_TOKEN, 200), + (VALID_ADMIN_HEADER_TOKEN, 200), + ('', 401) +]) +def test_authorities_get(client, token, status): + assert client.get(api.url_for(AuthoritiesList), headers=token).status_code == status + + +@pytest.mark.parametrize("token,status", [ + (VALID_USER_HEADER_TOKEN, 400), + (VALID_ADMIN_HEADER_TOKEN, 400), + ('', 401) +]) +def test_authorities_post(client, token, status): + assert client.post(api.url_for(AuthoritiesList), data={}, headers=token).status_code == status + + +@pytest.mark.parametrize("token,status", [ + (VALID_USER_HEADER_TOKEN, 405), + (VALID_ADMIN_HEADER_TOKEN, 405), + ('', 405) +]) +def test_authorities_put(client, token, status): + assert client.put(api.url_for(AuthoritiesList), data={}, headers=token).status_code == status + + +@pytest.mark.parametrize("token,status", [ + (VALID_USER_HEADER_TOKEN, 405), + (VALID_ADMIN_HEADER_TOKEN, 405), + ('', 405) +]) +def test_authorities_delete(client, token, status): + assert client.delete(api.url_for(AuthoritiesList), headers=token).status_code == status + + +@pytest.mark.parametrize("token,status", [ + (VALID_USER_HEADER_TOKEN, 405), + (VALID_ADMIN_HEADER_TOKEN, 405), + ('', 405) +]) +def test_authorities_patch(client, token, status): + assert client.patch(api.url_for(AuthoritiesList), data={}, headers=token).status_code == status + + +@pytest.mark.parametrize("token,status", [ + (VALID_USER_HEADER_TOKEN, 200), + (VALID_ADMIN_HEADER_TOKEN, 200), + ('', 401) +]) +def test_certificate_authorities_get(client, token, status): + assert client.get(api.url_for(AuthoritiesList), headers=token).status_code == status + + +@pytest.mark.parametrize("token,status", [ + (VALID_USER_HEADER_TOKEN, 400), + (VALID_ADMIN_HEADER_TOKEN, 400), + ('', 401) +]) +def test_certificate_authorities_post(client, token, status): + assert client.post(api.url_for(AuthoritiesList), data={}, headers=token).status_code == status + + +@pytest.mark.parametrize("token,status", [ + (VALID_USER_HEADER_TOKEN, 405), + (VALID_ADMIN_HEADER_TOKEN, 405), + ('', 405) +]) +def test_certificate_authorities_put(client, token, status): + assert client.put(api.url_for(AuthoritiesList), data={}, headers=token).status_code == status + + +@pytest.mark.parametrize("token,status", [ + (VALID_USER_HEADER_TOKEN, 405), + (VALID_ADMIN_HEADER_TOKEN, 405), + ('', 405) +]) +def test_certificate_authorities_delete(client, token, status): + assert client.delete(api.url_for(AuthoritiesList), headers=token).status_code == status + + +@pytest.mark.parametrize("token,status", [ + (VALID_USER_HEADER_TOKEN, 405), + (VALID_ADMIN_HEADER_TOKEN, 405), + ('', 405) +]) +def test_certificate_authorities_patch(client, token, status): + assert client.patch(api.url_for(AuthoritiesList), data={}, headers=token).status_code == status diff --git a/lemur/tests/test_certificates.py b/lemur/tests/test_certificates.py index 4ea5124a..50f052ac 100644 --- a/lemur/tests/test_certificates.py +++ b/lemur/tests/test_certificates.py @@ -2,13 +2,14 @@ from __future__ import unicode_literals # at top of module import pytest import json + from lemur.certificates.views import * # noqa from .vectors import VALID_ADMIN_HEADER_TOKEN, VALID_USER_HEADER_TOKEN def test_authority_identifier_schema(): - from lemur.certificates.schemas import AuthorityIdentifierSchema + from lemur.schemas import AuthorityIdentifierSchema input_data = {'useAuthorityCert': True} data, errors = AuthorityIdentifierSchema().load(input_data) @@ -22,7 +23,7 @@ def test_authority_identifier_schema(): def test_authority_key_identifier_schema(): - from lemur.certificates.schemas import AuthorityKeyIdentifierSchema + from lemur.schemas import AuthorityKeyIdentifierSchema input_data = {'useKeyIdentifier': True} data, errors = AuthorityKeyIdentifierSchema().load(input_data) @@ -36,7 +37,7 @@ def test_authority_key_identifier_schema(): def test_certificate_info_access_schema(): - from lemur.certificates.schemas import CertificateInfoAccessSchema + from lemur.schemas import CertificateInfoAccessSchema input_data = {'includeAIA': True} data, errors = CertificateInfoAccessSchema().load(input_data) @@ -49,7 +50,7 @@ def test_certificate_info_access_schema(): def test_subject_key_identifier_schema(): - from lemur.certificates.schemas import SubjectKeyIdentifierSchema + from lemur.schemas import SubjectKeyIdentifierSchema input_data = {'includeSKI': True} @@ -61,7 +62,7 @@ def test_subject_key_identifier_schema(): assert data == input_data -def test_extension_schema(): +def test_extension_schema(client): from lemur.certificates.schemas import ExtensionSchema input_data = { @@ -194,7 +195,7 @@ def test_certificate_valid_dates(client, authority): def test_sub_alt_name_schema(): - from lemur.certificates.schemas import SubAltNameSchema, SubAltNamesSchema + from lemur.schemas import SubAltNameSchema, SubAltNamesSchema input_data = {'nameType': 'DNSName', 'value': 'test.example.com'} data, errors = SubAltNameSchema().load(input_data) @@ -217,7 +218,7 @@ def test_sub_alt_name_schema(): def test_key_usage_schema(): - from lemur.certificates.schemas import KeyUsageSchema + from lemur.schemas import KeyUsageSchema input_data = { 'useCRLSign': True, @@ -244,7 +245,7 @@ def test_key_usage_schema(): def test_extended_key_usage_schema(): - from lemur.certificates.schemas import ExtendedKeyUsageSchema + from lemur.schemas import ExtendedKeyUsageSchema input_data = { 'useServerAuthentication': True,