Fixing several small issues. (#341)
* Fixing several small issues. * Fixing tests.
This commit is contained in:
parent
b2539b843b
commit
72e3fb5bfe
|
@ -23,8 +23,8 @@ class AuthorityInputSchema(LemurInputSchema):
|
||||||
description = fields.String()
|
description = fields.String()
|
||||||
common_name = fields.String(required=True, validate=validators.sensitive_domain)
|
common_name = fields.String(required=True, validate=validators.sensitive_domain)
|
||||||
|
|
||||||
validity_start = fields.DateTime()
|
validity_start = fields.Date()
|
||||||
validity_end = fields.DateTime()
|
validity_end = fields.Date()
|
||||||
validity_years = fields.Integer()
|
validity_years = fields.Integer()
|
||||||
|
|
||||||
# certificate body fields
|
# certificate body fields
|
||||||
|
|
|
@ -45,22 +45,24 @@ def mint(**kwargs):
|
||||||
"""
|
"""
|
||||||
issuer = kwargs['plugin']['plugin_object']
|
issuer = kwargs['plugin']['plugin_object']
|
||||||
body, chain, roles = issuer.create_authority(kwargs)
|
body, chain, roles = issuer.create_authority(kwargs)
|
||||||
|
roles = create_authority_roles(roles, kwargs['owner'], kwargs['plugin']['plugin_object'].title)
|
||||||
return body, chain, roles
|
return body, chain, roles
|
||||||
|
|
||||||
|
|
||||||
def create_authority_roles(**kwargs):
|
def create_authority_roles(roles, owner, plugin_title):
|
||||||
"""
|
"""
|
||||||
Creates all of the necessary authority roles.
|
Creates all of the necessary authority roles.
|
||||||
:param roles:
|
:param roles:
|
||||||
:param kwargs:
|
|
||||||
:return:
|
:return:
|
||||||
"""
|
"""
|
||||||
role_objs = []
|
role_objs = []
|
||||||
for r in kwargs['roles']:
|
for r in roles:
|
||||||
|
role = role_service.get_by_name(r['name'])
|
||||||
|
if not role:
|
||||||
role = role_service.create(
|
role = role_service.create(
|
||||||
r['name'],
|
r['name'],
|
||||||
password=r['password'],
|
password=r['password'],
|
||||||
description="Auto generated role for {0}".format(kwargs['plugin']['plugin_object'].title),
|
description="Auto generated role for {0}".format(plugin_title),
|
||||||
username=r['username'])
|
username=r['username'])
|
||||||
|
|
||||||
# the user creating the authority should be able to administer it
|
# the user creating the authority should be able to administer it
|
||||||
|
@ -70,11 +72,11 @@ def create_authority_roles(**kwargs):
|
||||||
role_objs.append(role)
|
role_objs.append(role)
|
||||||
|
|
||||||
# create an role for the owner and assign it
|
# create an role for the owner and assign it
|
||||||
owner_role = role_service.get_by_name(kwargs['owner'])
|
owner_role = role_service.get_by_name(owner)
|
||||||
if not owner_role:
|
if not owner_role:
|
||||||
owner_role = role_service.create(
|
owner_role = role_service.create(
|
||||||
kwargs['owner'],
|
owner,
|
||||||
description="Auto generated role based on owner: {0}".format(kwargs['owner'])
|
description="Auto generated role based on owner: {0}".format(owner)
|
||||||
)
|
)
|
||||||
|
|
||||||
role_objs.append(owner_role)
|
role_objs.append(owner_role)
|
||||||
|
@ -96,8 +98,6 @@ def create(**kwargs):
|
||||||
else:
|
else:
|
||||||
kwargs['roles'] = roles
|
kwargs['roles'] = roles
|
||||||
|
|
||||||
kwargs['roles'] = create_authority_roles(**kwargs)
|
|
||||||
|
|
||||||
if kwargs['type'] == 'subca':
|
if kwargs['type'] == 'subca':
|
||||||
description = "This is the ROOT certificate for the {0} sub certificate authority the parent \
|
description = "This is the ROOT certificate for the {0} sub certificate authority the parent \
|
||||||
authority is {1}.".format(kwargs.get('name'), kwargs.get('parent'))
|
authority is {1}.".format(kwargs.get('name'), kwargs.get('parent'))
|
||||||
|
@ -162,11 +162,8 @@ def get_authority_role(ca_name):
|
||||||
# TODO we should pick admin ca roles for admin
|
# TODO we should pick admin ca roles for admin
|
||||||
return authority.roles[0]
|
return authority.roles[0]
|
||||||
else:
|
else:
|
||||||
for role in g.current_user.roles:
|
authority = get_by_name(ca_name)
|
||||||
if role.authority:
|
return authority.roles[1]
|
||||||
for authority in role.authorities:
|
|
||||||
if authority.name == ca_name:
|
|
||||||
return role
|
|
||||||
|
|
||||||
|
|
||||||
def render(args):
|
def render(args):
|
||||||
|
|
|
@ -125,10 +125,7 @@ def create_certificate_roles(**kwargs):
|
||||||
description="Auto generated role based on owner: {0}".format(kwargs['owner'])
|
description="Auto generated role based on owner: {0}".format(kwargs['owner'])
|
||||||
)
|
)
|
||||||
|
|
||||||
if kwargs.get('roles'):
|
return [owner_role]
|
||||||
kwargs['roles'].append(owner_role)
|
|
||||||
|
|
||||||
return kwargs
|
|
||||||
|
|
||||||
|
|
||||||
def mint(**kwargs):
|
def mint(**kwargs):
|
||||||
|
@ -180,7 +177,12 @@ def upload(**kwargs):
|
||||||
"""
|
"""
|
||||||
Allows for pre-made certificates to be imported into Lemur.
|
Allows for pre-made certificates to be imported into Lemur.
|
||||||
"""
|
"""
|
||||||
kwargs = create_certificate_roles(**kwargs)
|
roles = create_certificate_roles(**kwargs)
|
||||||
|
|
||||||
|
if kwargs.get('roles'):
|
||||||
|
kwargs['roles'] += roles
|
||||||
|
else:
|
||||||
|
kwargs['roles'] = roles
|
||||||
|
|
||||||
cert = Certificate(**kwargs)
|
cert = Certificate(**kwargs)
|
||||||
|
|
||||||
|
@ -205,7 +207,12 @@ def create(**kwargs):
|
||||||
kwargs['private_key'] = private_key
|
kwargs['private_key'] = private_key
|
||||||
kwargs['chain'] = cert_chain
|
kwargs['chain'] = cert_chain
|
||||||
|
|
||||||
kwargs = create_certificate_roles(**kwargs)
|
roles = create_certificate_roles(**kwargs)
|
||||||
|
|
||||||
|
if kwargs.get('roles'):
|
||||||
|
kwargs['roles'] += roles
|
||||||
|
else:
|
||||||
|
kwargs['roles'] = roles
|
||||||
|
|
||||||
cert = Certificate(**kwargs)
|
cert = Certificate(**kwargs)
|
||||||
|
|
||||||
|
@ -214,6 +221,7 @@ def create(**kwargs):
|
||||||
cert.name = kwargs['name']
|
cert.name = kwargs['name']
|
||||||
|
|
||||||
g.user.certificates.append(cert)
|
g.user.certificates.append(cert)
|
||||||
|
cert.authority = kwargs['authority']
|
||||||
database.commit()
|
database.commit()
|
||||||
|
|
||||||
metrics.send('certificate_issued', 'counter', 1, metric_tags=dict(owner=cert.owner, issuer=cert.issuer))
|
metrics.send('certificate_issued', 'counter', 1, metric_tags=dict(owner=cert.owner, issuer=cert.issuer))
|
||||||
|
|
|
@ -1,4 +1,4 @@
|
||||||
|
import sys
|
||||||
from flask import current_app
|
from flask import current_app
|
||||||
from cryptography import x509
|
from cryptography import x509
|
||||||
from cryptography.hazmat.backends import default_backend
|
from cryptography.hazmat.backends import default_backend
|
||||||
|
@ -6,6 +6,9 @@ from lemur.constants import SAN_NAMING_TEMPLATE, DEFAULT_NAMING_TEMPLATE
|
||||||
|
|
||||||
|
|
||||||
def parse_certificate(body):
|
def parse_certificate(body):
|
||||||
|
if sys.version_info >= (3, 0):
|
||||||
|
return x509.load_pem_x509_certificate(body, default_backend())
|
||||||
|
else:
|
||||||
return x509.load_pem_x509_certificate(bytes(body), default_backend())
|
return x509.load_pem_x509_certificate(bytes(body), default_backend())
|
||||||
|
|
||||||
|
|
||||||
|
|
|
@ -102,10 +102,10 @@ def dates(data):
|
||||||
raise ValidationError('Validity start must be before validity end.')
|
raise ValidationError('Validity start must be before validity end.')
|
||||||
|
|
||||||
if data.get('authority'):
|
if data.get('authority'):
|
||||||
if data.get('validity_start').replace(tzinfo=None) < data['authority'].authority_certificate.not_before:
|
if data.get('validity_start').replace(hour=0, minute=0, second=0, tzinfo=None) < data['authority'].authority_certificate.not_before.replace(hour=0, minute=0, second=0):
|
||||||
raise ValidationError('Validity start must not be before {0}'.format(data['authority'].authority_certificate.not_before))
|
raise ValidationError('Validity start must not be before {0}'.format(data['authority'].authority_certificate.not_before))
|
||||||
|
|
||||||
if data.get('validity_end').replace(tzinfo=None) > data['authority'].authority_certificate.not_after:
|
if data.get('validity_end').replace(hour=0, minute=0, second=0, tzinfo=None) > data['authority'].authority_certificate.not_after.replace(hour=0, minute=0, second=0):
|
||||||
raise ValidationError('Validity end must not be after {0}'.format(data['authority'].authority_certificate.not_after))
|
raise ValidationError('Validity end must not be after {0}'.format(data['authority'].authority_certificate.not_after))
|
||||||
|
|
||||||
if data.get('validity_years'):
|
if data.get('validity_years'):
|
||||||
|
|
|
@ -187,8 +187,8 @@ def test_certificate_valid_dates(client, authority):
|
||||||
'owner': 'jim@example.com',
|
'owner': 'jim@example.com',
|
||||||
'authority': {'id': authority.id},
|
'authority': {'id': authority.id},
|
||||||
'description': 'testtestest',
|
'description': 'testtestest',
|
||||||
'validityStart': '2020-01-01T00:21:34.513631',
|
'validityStart': '2020-01-01T00:00:00',
|
||||||
'validityEnd': '2020-01-01T00:22:34.513631'
|
'validityEnd': '2020-01-01T00:00:01'
|
||||||
}
|
}
|
||||||
|
|
||||||
data, errors = CertificateInputSchema().load(input_data)
|
data, errors = CertificateInputSchema().load(input_data)
|
||||||
|
|
Loading…
Reference in New Issue