Closes 262 (#324)
Moves the authority -> role relationship from a 1 -> many to a many -> many. This will allow one role to control and have access to many authorities.
This commit is contained in:
parent
112c6252d6
commit
615df76dd5
@ -14,7 +14,8 @@ from sqlalchemy import Column, Integer, String, Text, func, ForeignKey, DateTime
|
||||
from sqlalchemy.dialects.postgresql import JSON
|
||||
|
||||
from lemur.database import db
|
||||
from lemur.certificates.models import get_cn, get_not_after, get_not_before
|
||||
from lemur.models import roles_authorities
|
||||
from lemur.common import defaults
|
||||
|
||||
|
||||
class Authority(db.Model):
|
||||
@ -33,28 +34,21 @@ class Authority(db.Model):
|
||||
plugin_name = Column(String(64))
|
||||
description = Column(Text)
|
||||
options = Column(JSON)
|
||||
roles = relationship('Role', backref=db.backref('authority'), lazy='dynamic')
|
||||
roles = relationship('Role', secondary=roles_authorities, passive_deletes=True, backref=db.backref('authority'), lazy='dynamic')
|
||||
user_id = Column(Integer, ForeignKey('users.id'))
|
||||
certificates = relationship("Certificate", backref='authority')
|
||||
|
||||
def __init__(self, name, owner, plugin_name, body, roles=None, chain=None, description=None):
|
||||
cert = x509.load_pem_x509_certificate(bytes(body), default_backend())
|
||||
self.name = name
|
||||
self.body = body
|
||||
self.chain = chain
|
||||
self.owner = owner
|
||||
self.description = description
|
||||
self.plugin_name = plugin_name
|
||||
cert = x509.load_pem_x509_certificate(bytes(body), default_backend())
|
||||
self.cn = get_cn(cert)
|
||||
self.not_before = get_not_before(cert)
|
||||
self.not_after = get_not_after(cert)
|
||||
self.cn = defaults.common_name(cert)
|
||||
self.not_before = defaults.not_before(cert)
|
||||
self.not_after = defaults.not_after(cert)
|
||||
|
||||
if roles:
|
||||
self.roles = roles
|
||||
|
||||
def as_dict(self):
|
||||
return {c.name: getattr(self, c.name) for c in self.__table__.columns}
|
||||
|
||||
def serialize(self):
|
||||
blob = self.as_dict()
|
||||
return blob
|
||||
|
@ -63,6 +63,7 @@ class AuthorityInputSchema(LemurInputSchema):
|
||||
class AuthorityUpdateSchema(LemurInputSchema):
|
||||
owner = fields.Email()
|
||||
description = fields.String()
|
||||
active = fields.Boolean()
|
||||
roles = fields.Nested(AssociatedRoleSchema(many=True))
|
||||
|
||||
|
||||
|
@ -16,7 +16,6 @@ from lemur.authorities.models import Authority
|
||||
from lemur.roles import service as role_service
|
||||
from lemur.notifications import service as notification_service
|
||||
|
||||
from lemur.roles.models import Role
|
||||
from lemur.certificates.models import Certificate
|
||||
|
||||
|
||||
@ -29,8 +28,9 @@ def update(authority_id, description=None, owner=None, active=None, roles=None):
|
||||
:return:
|
||||
"""
|
||||
authority = get(authority_id)
|
||||
|
||||
if roles:
|
||||
authority = database.update_list(authority, 'roles', Role, roles)
|
||||
authority.roles = roles
|
||||
|
||||
if active:
|
||||
authority.active = active
|
||||
@ -52,8 +52,7 @@ def create(kwargs):
|
||||
kwargs['creator'] = g.current_user.email
|
||||
cert_body, intermediate, issuer_roles = issuer.create_authority(kwargs)
|
||||
|
||||
cert = Certificate(cert_body, chain=intermediate)
|
||||
cert.owner = kwargs['owner']
|
||||
cert = Certificate(body=cert_body, chain=intermediate, **kwargs)
|
||||
|
||||
if kwargs['type'] == 'subca':
|
||||
cert.description = "This is the ROOT certificate for the {0} sub certificate authority the parent \
|
||||
@ -73,7 +72,6 @@ def create(kwargs):
|
||||
# we create and attach any roles that the issuer gives us
|
||||
role_objs = []
|
||||
for r in issuer_roles:
|
||||
|
||||
role = role_service.create(
|
||||
r['name'],
|
||||
password=r['password'],
|
||||
@ -86,6 +84,16 @@ def create(kwargs):
|
||||
|
||||
role_objs.append(role)
|
||||
|
||||
# create an role for the owner and assign it
|
||||
owner_role = role_service.get_by_name(kwargs['owner'])
|
||||
if not owner_role:
|
||||
owner_role = role_service.create(
|
||||
kwargs['owner'],
|
||||
description="Auto generated role based on owner: {0}".format(kwargs['owner'])
|
||||
)
|
||||
|
||||
role_objs.append(owner_role)
|
||||
|
||||
authority = Authority(
|
||||
kwargs.get('name'),
|
||||
kwargs['owner'],
|
||||
@ -99,14 +107,6 @@ def create(kwargs):
|
||||
database.update(cert)
|
||||
authority = database.create(authority)
|
||||
|
||||
# the owning dl or role should have this authority associated with it
|
||||
owner_role = role_service.get_by_name(kwargs['owner'])
|
||||
|
||||
if not owner_role:
|
||||
owner_role = role_service.create(kwargs['owner'])
|
||||
|
||||
owner_role.authority = authority
|
||||
|
||||
g.current_user.authorities.append(authority)
|
||||
|
||||
return authority
|
||||
@ -181,8 +181,8 @@ def render(args):
|
||||
if not g.current_user.is_admin:
|
||||
authority_ids = []
|
||||
for role in g.current_user.roles:
|
||||
if role.authority:
|
||||
authority_ids.append(role.authority.id)
|
||||
for authority in role.authorities:
|
||||
authority_ids.append(authority.id)
|
||||
query = query.filter(Authority.id.in_(authority_ids))
|
||||
|
||||
return database.sort_and_page(query, Authority, args)
|
||||
|
@ -279,15 +279,15 @@ class Authorities(AuthenticatedResource):
|
||||
roles.append(role)
|
||||
permission = AuthorityPermission(authority_id, roles)
|
||||
|
||||
# we want to make sure that we cannot add roles that we are not members of
|
||||
if not g.current_user.is_admin:
|
||||
role_ids = set([r.id for r in data['roles']])
|
||||
user_role_ids = set([r.id for r in g.current_user.roles])
|
||||
|
||||
if not role_ids.issubset(user_role_ids):
|
||||
return dict(message="You are not allowed to associate a role which you are not a member of"), 400
|
||||
|
||||
if permission.can():
|
||||
# we want to make sure that we cannot add roles that we are not members of
|
||||
if not g.current_user.is_admin:
|
||||
role_ids = set([r.id for r in data['roles']])
|
||||
user_role_ids = set([r.id for r in g.current_user.roles])
|
||||
|
||||
if not role_ids.issubset(user_role_ids):
|
||||
return dict(message="You are not allowed to associate a role which you are not a member of."), 403
|
||||
|
||||
return service.update(
|
||||
authority_id,
|
||||
owner=data['owner'],
|
||||
@ -296,7 +296,7 @@ class Authorities(AuthenticatedResource):
|
||||
roles=data['roles']
|
||||
)
|
||||
|
||||
return dict(message="You are not authorized to update this authority"), 403
|
||||
return dict(message="You are not authorized to update this authority."), 403
|
||||
|
||||
|
||||
class CertificateAuthority(AuthenticatedResource):
|
||||
@ -345,7 +345,7 @@ class CertificateAuthority(AuthenticatedResource):
|
||||
"""
|
||||
cert = certificate_service.get(certificate_id)
|
||||
if not cert:
|
||||
return dict(message="Certificate not found"), 404
|
||||
return dict(message="Certificate not found."), 404
|
||||
|
||||
return cert.authority
|
||||
|
||||
|
@ -6,269 +6,88 @@
|
||||
.. moduleauthor:: Kevin Glisson <kglisson@netflix.com>
|
||||
"""
|
||||
import datetime
|
||||
from flask import current_app
|
||||
|
||||
from cryptography import x509
|
||||
from cryptography.hazmat.backends import default_backend
|
||||
from sqlalchemy.orm import relationship
|
||||
from sqlalchemy import event, Integer, ForeignKey, String, DateTime, PassiveDefault, func, Column, Text, Boolean
|
||||
from sqlalchemy.orm import relationship
|
||||
|
||||
from lemur.utils import Vault
|
||||
from lemur.database import db
|
||||
from lemur.plugins.base import plugins
|
||||
|
||||
from lemur.domains.models import Domain
|
||||
|
||||
from lemur.constants import SAN_NAMING_TEMPLATE, DEFAULT_NAMING_TEMPLATE
|
||||
|
||||
from lemur.models import certificate_associations, certificate_source_associations, \
|
||||
certificate_destination_associations, certificate_notification_associations, \
|
||||
certificate_replacement_associations
|
||||
certificate_replacement_associations, roles_certificates
|
||||
from lemur.plugins.base import plugins
|
||||
from lemur.utils import Vault
|
||||
|
||||
from lemur.common import defaults
|
||||
from lemur.domains.models import Domain
|
||||
|
||||
|
||||
def create_name(issuer, not_before, not_after, subject, san):
|
||||
"""
|
||||
Create a name for our certificate. A naming standard
|
||||
is based on a series of templates. The name includes
|
||||
useful information such as Common Name, Validation dates,
|
||||
and Issuer.
|
||||
def get_or_increase_name(name):
|
||||
count = Certificate.query.filter(Certificate.name == name).count()
|
||||
|
||||
:param san:
|
||||
:param subject:
|
||||
:param not_after:
|
||||
:param issuer:
|
||||
:param not_before:
|
||||
:rtype : str
|
||||
:return:
|
||||
"""
|
||||
if san:
|
||||
t = SAN_NAMING_TEMPLATE
|
||||
else:
|
||||
t = DEFAULT_NAMING_TEMPLATE
|
||||
if count == 1:
|
||||
return name + '-1'
|
||||
elif count > 1:
|
||||
return name + '-' + str(count)
|
||||
|
||||
temp = t.format(
|
||||
subject=subject,
|
||||
issuer=issuer,
|
||||
not_before=not_before.strftime('%Y%m%d'),
|
||||
not_after=not_after.strftime('%Y%m%d')
|
||||
)
|
||||
|
||||
disallowed_chars = ''.join(c for c in map(chr, range(256)) if not c.isalnum())
|
||||
disallowed_chars = disallowed_chars.replace("-", "")
|
||||
disallowed_chars = disallowed_chars.replace(".", "")
|
||||
temp = temp.replace('*', "WILDCARD")
|
||||
|
||||
for c in disallowed_chars:
|
||||
temp = temp.replace(c, "")
|
||||
|
||||
# white space is silly too
|
||||
final = temp.replace(" ", "-")
|
||||
|
||||
# we don't want any overlapping certificate names
|
||||
count = Certificate.query.filter(func.lower(Certificate.name) == func.lower(final)).count()
|
||||
|
||||
if count:
|
||||
count += 1
|
||||
final += str(count)
|
||||
|
||||
return final
|
||||
|
||||
|
||||
def get_signing_algorithm(cert):
|
||||
return cert.signature_hash_algorithm.name
|
||||
|
||||
|
||||
def get_cn(cert):
|
||||
"""
|
||||
Attempts to get a sane common name from a given certificate.
|
||||
|
||||
:param cert:
|
||||
:return: Common name or None
|
||||
"""
|
||||
return cert.subject.get_attributes_for_oid(
|
||||
x509.OID_COMMON_NAME
|
||||
)[0].value.strip()
|
||||
|
||||
|
||||
def get_domains(cert):
|
||||
"""
|
||||
Attempts to get an domains listed in a certificate.
|
||||
If 'subjectAltName' extension is not available we simply
|
||||
return the common name.
|
||||
|
||||
:param cert:
|
||||
:return: List of domains
|
||||
"""
|
||||
domains = []
|
||||
try:
|
||||
ext = cert.extensions.get_extension_for_oid(x509.OID_SUBJECT_ALTERNATIVE_NAME)
|
||||
entries = ext.value.get_values_for_type(x509.DNSName)
|
||||
for entry in entries:
|
||||
domains.append(entry)
|
||||
except Exception as e:
|
||||
current_app.logger.warning("Failed to get SubjectAltName: {0}".format(e))
|
||||
|
||||
return domains
|
||||
|
||||
|
||||
def get_serial(cert):
|
||||
"""
|
||||
Fetch the serial number from the certificate.
|
||||
|
||||
:param cert:
|
||||
:return: serial number
|
||||
"""
|
||||
return cert.serial
|
||||
|
||||
|
||||
def is_san(cert):
|
||||
"""
|
||||
Determines if a given certificate is a SAN certificate.
|
||||
SAN certificates are simply certificates that cover multiple domains.
|
||||
|
||||
:param cert:
|
||||
:return: Bool
|
||||
"""
|
||||
if len(get_domains(cert)) > 1:
|
||||
return True
|
||||
|
||||
|
||||
def is_wildcard(cert):
|
||||
"""
|
||||
Determines if certificate is a wildcard certificate.
|
||||
|
||||
:param cert:
|
||||
:return: Bool
|
||||
"""
|
||||
domains = get_domains(cert)
|
||||
if len(domains) == 1 and domains[0][0:1] == "*":
|
||||
return True
|
||||
|
||||
if cert.subject.get_attributes_for_oid(x509.OID_COMMON_NAME)[0].value[0:1] == "*":
|
||||
return True
|
||||
|
||||
|
||||
def get_bitstrength(cert):
|
||||
"""
|
||||
Calculates a certificates public key bit length.
|
||||
|
||||
:param cert:
|
||||
:return: Integer
|
||||
"""
|
||||
return cert.public_key().key_size
|
||||
|
||||
|
||||
def get_issuer(cert):
|
||||
"""
|
||||
Gets a sane issuer from a given certificate.
|
||||
|
||||
:param cert:
|
||||
:return: Issuer
|
||||
"""
|
||||
delchars = ''.join(c for c in map(chr, range(256)) if not c.isalnum())
|
||||
try:
|
||||
issuer = str(cert.issuer.get_attributes_for_oid(x509.OID_ORGANIZATION_NAME)[0].value)
|
||||
for c in delchars:
|
||||
issuer = issuer.replace(c, "")
|
||||
return issuer
|
||||
except Exception as e:
|
||||
current_app.logger.error("Unable to get issuer! {0}".format(e))
|
||||
|
||||
|
||||
def get_not_before(cert):
|
||||
"""
|
||||
Gets the naive datetime of the certificates 'not_before' field.
|
||||
This field denotes the first date in time which the given certificate
|
||||
is valid.
|
||||
|
||||
:param cert:
|
||||
:return: Datetime
|
||||
"""
|
||||
return cert.not_valid_before
|
||||
|
||||
|
||||
def get_not_after(cert):
|
||||
"""
|
||||
Gets the naive datetime of the certificates 'not_after' field.
|
||||
This field denotes the last date in time which the given certificate
|
||||
is valid.
|
||||
|
||||
:param cert:
|
||||
:return: Datetime
|
||||
"""
|
||||
return cert.not_valid_after
|
||||
|
||||
|
||||
def get_name_from_arn(arn):
|
||||
"""
|
||||
Extract the certificate name from an arn.
|
||||
|
||||
:param arn: IAM SSL arn
|
||||
:return: name of the certificate as uploaded to AWS
|
||||
"""
|
||||
return arn.split("/", 1)[1]
|
||||
|
||||
|
||||
def get_account_number(arn):
|
||||
"""
|
||||
Extract the account number from an arn.
|
||||
|
||||
:param arn: IAM SSL arn
|
||||
:return: account number associated with ARN
|
||||
"""
|
||||
return arn.split(":")[4]
|
||||
return name
|
||||
|
||||
|
||||
class Certificate(db.Model):
|
||||
__tablename__ = 'certificates'
|
||||
id = Column(Integer, primary_key=True)
|
||||
owner = Column(String(128))
|
||||
body = Column(Text())
|
||||
private_key = Column(Vault)
|
||||
status = Column(String(128))
|
||||
deleted = Column(Boolean, index=True)
|
||||
name = Column(String(128))
|
||||
owner = Column(String(128), nullable=False)
|
||||
name = Column(String(128), unique=True)
|
||||
description = Column(String(1024))
|
||||
active = Column(Boolean, default=True)
|
||||
|
||||
body = Column(Text(), nullable=False)
|
||||
chain = Column(Text())
|
||||
bits = Column(Integer())
|
||||
private_key = Column(Vault)
|
||||
|
||||
issuer = Column(String(128))
|
||||
serial = Column(String(128))
|
||||
cn = Column(String(128))
|
||||
description = Column(String(1024))
|
||||
active = Column(Boolean, default=True)
|
||||
san = Column(String(1024)) # TODO this should be migrated to boolean
|
||||
deleted = Column(Boolean, index=True)
|
||||
|
||||
not_before = Column(DateTime)
|
||||
not_after = Column(DateTime)
|
||||
date_created = Column(DateTime, PassiveDefault(func.now()), nullable=False)
|
||||
|
||||
signing_algorithm = Column(String(128))
|
||||
status = Column(String(128))
|
||||
bits = Column(Integer())
|
||||
san = Column(String(1024)) # TODO this should be migrated to boolean
|
||||
|
||||
user_id = Column(Integer, ForeignKey('users.id'))
|
||||
authority_id = Column(Integer, ForeignKey('authorities.id'))
|
||||
notifications = relationship("Notification", secondary=certificate_notification_associations, backref='certificate')
|
||||
destinations = relationship("Destination", secondary=certificate_destination_associations, backref='certificate')
|
||||
sources = relationship("Source", secondary=certificate_source_associations, backref='certificate')
|
||||
domains = relationship("Domain", secondary=certificate_associations, backref="certificate")
|
||||
roles = relationship("Role", secondary=roles_certificates, backref="certificate")
|
||||
replaces = relationship("Certificate",
|
||||
secondary=certificate_replacement_associations,
|
||||
primaryjoin=id == certificate_replacement_associations.c.certificate_id, # noqa
|
||||
secondaryjoin=id == certificate_replacement_associations.c.replaced_certificate_id, # noqa
|
||||
backref='replaced')
|
||||
sources = relationship("Source", secondary=certificate_source_associations, backref='certificate')
|
||||
domains = relationship("Domain", secondary=certificate_associations, backref="certificate")
|
||||
|
||||
def __init__(self, body, private_key=None, chain=None):
|
||||
self.body = body
|
||||
# We encrypt the private_key on creation
|
||||
self.private_key = private_key
|
||||
self.chain = chain
|
||||
cert = x509.load_pem_x509_certificate(bytes(self.body), default_backend())
|
||||
self.signing_algorithm = get_signing_algorithm(cert)
|
||||
self.bits = get_bitstrength(cert)
|
||||
self.issuer = get_issuer(cert)
|
||||
self.serial = get_serial(cert)
|
||||
self.cn = get_cn(cert)
|
||||
self.san = is_san(cert)
|
||||
self.not_before = get_not_before(cert)
|
||||
self.not_after = get_not_after(cert)
|
||||
self.name = create_name(self.issuer, self.not_before, self.not_after, self.cn, self.san)
|
||||
def __init__(self, **kwargs):
|
||||
cert = defaults.parse_certificate(kwargs['body'])
|
||||
self.owner = kwargs['owner']
|
||||
self.body = kwargs['body']
|
||||
self.private_key = kwargs.get('private_key')
|
||||
self.chain = kwargs.get('chain')
|
||||
self.signing_algorithm = defaults.signing_algorithm(cert)
|
||||
self.bits = defaults.bitstrength(cert)
|
||||
self.issuer = defaults.issuer(cert)
|
||||
self.serial = defaults.serial(cert)
|
||||
self.cn = defaults.common_name(cert)
|
||||
self.san = defaults.san(cert)
|
||||
self.not_before = defaults.not_before(cert)
|
||||
self.not_after = defaults.not_after(cert)
|
||||
self.name = get_or_increase_name(defaults.certificate_name(self.cn, self.issuer, self.not_before, self.not_after, self.san))
|
||||
|
||||
for domain in get_domains(cert):
|
||||
for domain in defaults.domains(cert):
|
||||
self.domains.append(Domain(name=domain))
|
||||
|
||||
@property
|
||||
|
@ -481,3 +481,23 @@ def stats(**kwargs):
|
||||
values.append(count)
|
||||
|
||||
return {'labels': keys, 'values': values}
|
||||
|
||||
|
||||
def get_account_number(arn):
|
||||
"""
|
||||
Extract the account number from an arn.
|
||||
|
||||
:param arn: IAM SSL arn
|
||||
:return: account number associated with ARN
|
||||
"""
|
||||
return arn.split(":")[4]
|
||||
|
||||
|
||||
def get_name_from_arn(arn):
|
||||
"""
|
||||
Extract the certificate name from an arn.
|
||||
|
||||
:param arn: IAM SSL arn
|
||||
:return: name of the certificate as uploaded to AWS
|
||||
"""
|
||||
return arn.split("/", 1)[1]
|
||||
|
172
lemur/common/defaults.py
Normal file
172
lemur/common/defaults.py
Normal file
@ -0,0 +1,172 @@
|
||||
|
||||
from flask import current_app
|
||||
from cryptography import x509
|
||||
from cryptography.hazmat.backends import default_backend
|
||||
from lemur.constants import SAN_NAMING_TEMPLATE, DEFAULT_NAMING_TEMPLATE
|
||||
|
||||
|
||||
def parse_certificate(body):
|
||||
return x509.load_pem_x509_certificate(bytes(body), default_backend())
|
||||
|
||||
|
||||
def certificate_name(common_name, issuer, not_before, not_after, san):
|
||||
"""
|
||||
Create a name for our certificate. A naming standard
|
||||
is based on a series of templates. The name includes
|
||||
useful information such as Common Name, Validation dates,
|
||||
and Issuer.
|
||||
|
||||
:param san:
|
||||
:param common_name:
|
||||
:param not_after:
|
||||
:param issuer:
|
||||
:param not_before:
|
||||
:rtype : str
|
||||
:return:
|
||||
"""
|
||||
if san:
|
||||
t = SAN_NAMING_TEMPLATE
|
||||
else:
|
||||
t = DEFAULT_NAMING_TEMPLATE
|
||||
|
||||
temp = t.format(
|
||||
subject=common_name,
|
||||
issuer=issuer,
|
||||
not_before=not_before.strftime('%Y%m%d'),
|
||||
not_after=not_after.strftime('%Y%m%d')
|
||||
)
|
||||
|
||||
disallowed_chars = ''.join(c for c in map(chr, range(256)) if not c.isalnum())
|
||||
disallowed_chars = disallowed_chars.replace("-", "")
|
||||
disallowed_chars = disallowed_chars.replace(".", "")
|
||||
temp = temp.replace('*', "WILDCARD")
|
||||
|
||||
for c in disallowed_chars:
|
||||
temp = temp.replace(c, "")
|
||||
|
||||
# white space is silly too
|
||||
return temp.replace(" ", "-")
|
||||
|
||||
|
||||
def signing_algorithm(cert):
|
||||
return cert.signature_hash_algorithm.name
|
||||
|
||||
|
||||
def common_name(cert):
|
||||
"""
|
||||
Attempts to get a sane common name from a given certificate.
|
||||
|
||||
:param cert:
|
||||
:return: Common name or None
|
||||
"""
|
||||
return cert.subject.get_attributes_for_oid(
|
||||
x509.OID_COMMON_NAME
|
||||
)[0].value.strip()
|
||||
|
||||
|
||||
def domains(cert):
|
||||
"""
|
||||
Attempts to get an domains listed in a certificate.
|
||||
If 'subjectAltName' extension is not available we simply
|
||||
return the common name.
|
||||
|
||||
:param cert:
|
||||
:return: List of domains
|
||||
"""
|
||||
domains = []
|
||||
try:
|
||||
ext = cert.extensions.get_extension_for_oid(x509.OID_SUBJECT_ALTERNATIVE_NAME)
|
||||
entries = ext.value.get_values_for_type(x509.DNSName)
|
||||
for entry in entries:
|
||||
domains.append(entry)
|
||||
except Exception as e:
|
||||
current_app.logger.warning("Failed to get SubjectAltName: {0}".format(e))
|
||||
|
||||
return domains
|
||||
|
||||
|
||||
def serial(cert):
|
||||
"""
|
||||
Fetch the serial number from the certificate.
|
||||
|
||||
:param cert:
|
||||
:return: serial number
|
||||
"""
|
||||
return cert.serial
|
||||
|
||||
|
||||
def san(cert):
|
||||
"""
|
||||
Determines if a given certificate is a SAN certificate.
|
||||
SAN certificates are simply certificates that cover multiple domains.
|
||||
|
||||
:param cert:
|
||||
:return: Bool
|
||||
"""
|
||||
if len(domains(cert)) > 1:
|
||||
return True
|
||||
|
||||
|
||||
def is_wildcard(cert):
|
||||
"""
|
||||
Determines if certificate is a wildcard certificate.
|
||||
|
||||
:param cert:
|
||||
:return: Bool
|
||||
"""
|
||||
d = domains(cert)
|
||||
if len(d) == 1 and d[0][0:1] == "*":
|
||||
return True
|
||||
|
||||
if cert.subject.get_attributes_for_oid(x509.OID_COMMON_NAME)[0].value[0:1] == "*":
|
||||
return True
|
||||
|
||||
|
||||
def bitstrength(cert):
|
||||
"""
|
||||
Calculates a certificates public key bit length.
|
||||
|
||||
:param cert:
|
||||
:return: Integer
|
||||
"""
|
||||
return cert.public_key().key_size
|
||||
|
||||
|
||||
def issuer(cert):
|
||||
"""
|
||||
Gets a sane issuer from a given certificate.
|
||||
|
||||
:param cert:
|
||||
:return: Issuer
|
||||
"""
|
||||
delchars = ''.join(c for c in map(chr, range(256)) if not c.isalnum())
|
||||
try:
|
||||
issuer = str(cert.issuer.get_attributes_for_oid(x509.OID_ORGANIZATION_NAME)[0].value)
|
||||
for c in delchars:
|
||||
issuer = issuer.replace(c, "")
|
||||
return issuer
|
||||
except Exception as e:
|
||||
current_app.logger.error("Unable to get issuer! {0}".format(e))
|
||||
|
||||
|
||||
def not_before(cert):
|
||||
"""
|
||||
Gets the naive datetime of the certificates 'not_before' field.
|
||||
This field denotes the first date in time which the given certificate
|
||||
is valid.
|
||||
|
||||
:param cert:
|
||||
:return: Datetime
|
||||
"""
|
||||
return cert.not_valid_before
|
||||
|
||||
|
||||
def not_after(cert):
|
||||
"""
|
||||
Gets the naive datetime of the certificates 'not_after' field.
|
||||
This field denotes the last date in time which the given certificate
|
||||
is valid.
|
||||
|
||||
:return: Datetime
|
||||
"""
|
||||
return cert.not_valid_after
|
@ -25,7 +25,7 @@ from lemur.certificates import service as cert_service
|
||||
from lemur.sources import service as source_service
|
||||
from lemur.notifications import service as notification_service
|
||||
|
||||
from lemur.certificates.models import get_name_from_arn
|
||||
from lemur.certificates.service import get_name_from_arn
|
||||
from lemur.certificates.verify import verify_string
|
||||
|
||||
from lemur.plugins.lemur_aws import elb
|
||||
|
65
lemur/migrations/versions/412b22cb656a_.py
Normal file
65
lemur/migrations/versions/412b22cb656a_.py
Normal file
@ -0,0 +1,65 @@
|
||||
"""empty message
|
||||
|
||||
Revision ID: 412b22cb656a
|
||||
Revises: 4c50b903d1ae
|
||||
Create Date: 2016-05-17 17:37:41.210232
|
||||
|
||||
"""
|
||||
|
||||
# revision identifiers, used by Alembic.
|
||||
revision = '412b22cb656a'
|
||||
down_revision = '4c50b903d1ae'
|
||||
|
||||
from alembic import op
|
||||
import sqlalchemy as sa
|
||||
from sqlalchemy.sql import text
|
||||
|
||||
|
||||
|
||||
|
||||
def upgrade():
|
||||
### commands auto generated by Alembic - please adjust! ###
|
||||
op.create_table('roles_authorities',
|
||||
sa.Column('authority_id', sa.Integer(), nullable=True),
|
||||
sa.Column('role_id', sa.Integer(), nullable=True),
|
||||
sa.ForeignKeyConstraint(['authority_id'], ['authorities.id'], ),
|
||||
sa.ForeignKeyConstraint(['role_id'], ['roles.id'], )
|
||||
)
|
||||
op.create_index('roles_authorities_ix', 'roles_authorities', ['authority_id', 'role_id'], unique=True)
|
||||
op.create_table('roles_certificates',
|
||||
sa.Column('certificate_id', sa.Integer(), nullable=True),
|
||||
sa.Column('role_id', sa.Integer(), nullable=True),
|
||||
sa.ForeignKeyConstraint(['certificate_id'], ['certificates.id'], ),
|
||||
sa.ForeignKeyConstraint(['role_id'], ['roles.id'], )
|
||||
)
|
||||
op.create_index('roles_certificates_ix', 'roles_certificates', ['certificate_id', 'role_id'], unique=True)
|
||||
op.create_index('certificate_associations_ix', 'certificate_associations', ['domain_id', 'certificate_id'], unique=True)
|
||||
op.create_index('certificate_destination_associations_ix', 'certificate_destination_associations', ['destination_id', 'certificate_id'], unique=True)
|
||||
op.create_index('certificate_notification_associations_ix', 'certificate_notification_associations', ['notification_id', 'certificate_id'], unique=True)
|
||||
op.create_index('certificate_replacement_associations_ix', 'certificate_replacement_associations', ['certificate_id', 'certificate_id'], unique=True)
|
||||
op.create_index('certificate_source_associations_ix', 'certificate_source_associations', ['source_id', 'certificate_id'], unique=True)
|
||||
op.create_index('roles_users_ix', 'roles_users', ['user_id', 'role_id'], unique=True)
|
||||
|
||||
### end Alembic commands ###
|
||||
|
||||
# migrate existing authority_id relationship to many_to_many
|
||||
conn = op.get_bind()
|
||||
for id, authority_id in conn.execute(text('select id, authority_id from roles where authority_id is not null')):
|
||||
stmt = text('insert into roles_authorities (role_id, authority_id) values (:role_id, :authority_id)')
|
||||
stmt = stmt.bindparams(role_id=id, authority_id=authority_id)
|
||||
op.execute(stmt)
|
||||
|
||||
|
||||
def downgrade():
|
||||
### commands auto generated by Alembic - please adjust! ###
|
||||
op.drop_index('roles_users_ix', table_name='roles_users')
|
||||
op.drop_index('certificate_source_associations_ix', table_name='certificate_source_associations')
|
||||
op.drop_index('certificate_replacement_associations_ix', table_name='certificate_replacement_associations')
|
||||
op.drop_index('certificate_notification_associations_ix', table_name='certificate_notification_associations')
|
||||
op.drop_index('certificate_destination_associations_ix', table_name='certificate_destination_associations')
|
||||
op.drop_index('certificate_associations_ix', table_name='certificate_associations')
|
||||
op.drop_index('roles_certificates_ix', table_name='roles_certificates')
|
||||
op.drop_table('roles_certificates')
|
||||
op.drop_index('roles_authorities_ix', table_name='roles_authorities')
|
||||
op.drop_table('roles_authorities')
|
||||
### end Alembic commands ###
|
@ -8,7 +8,8 @@
|
||||
:license: Apache, see LICENSE for more details.
|
||||
.. moduleauthor:: Kevin Glisson <kglisson@netflix.com>
|
||||
"""
|
||||
from sqlalchemy import Column, Integer, ForeignKey
|
||||
from sqlalchemy import Column, Integer, ForeignKey, Index
|
||||
|
||||
from lemur.database import db
|
||||
|
||||
certificate_associations = db.Table('certificate_associations',
|
||||
@ -16,6 +17,8 @@ certificate_associations = db.Table('certificate_associations',
|
||||
Column('certificate_id', Integer, ForeignKey('certificates.id'))
|
||||
)
|
||||
|
||||
Index('certificate_associations_ix', certificate_associations.c.domain_id, certificate_associations.c.certificate_id)
|
||||
|
||||
certificate_destination_associations = db.Table('certificate_destination_associations',
|
||||
Column('destination_id', Integer,
|
||||
ForeignKey('destinations.id', ondelete='cascade')),
|
||||
@ -23,6 +26,8 @@ certificate_destination_associations = db.Table('certificate_destination_associa
|
||||
ForeignKey('certificates.id', ondelete='cascade'))
|
||||
)
|
||||
|
||||
Index('certificate_destination_associations_ix', certificate_destination_associations.c.destination_id, certificate_destination_associations.c.certificate_id)
|
||||
|
||||
certificate_source_associations = db.Table('certificate_source_associations',
|
||||
Column('source_id', Integer,
|
||||
ForeignKey('sources.id', ondelete='cascade')),
|
||||
@ -30,6 +35,8 @@ certificate_source_associations = db.Table('certificate_source_associations',
|
||||
ForeignKey('certificates.id', ondelete='cascade'))
|
||||
)
|
||||
|
||||
Index('certificate_source_associations_ix', certificate_source_associations.c.source_id, certificate_source_associations.c.certificate_id)
|
||||
|
||||
certificate_notification_associations = db.Table('certificate_notification_associations',
|
||||
Column('notification_id', Integer,
|
||||
ForeignKey('notifications.id', ondelete='cascade')),
|
||||
@ -37,6 +44,8 @@ certificate_notification_associations = db.Table('certificate_notification_assoc
|
||||
ForeignKey('certificates.id', ondelete='cascade'))
|
||||
)
|
||||
|
||||
Index('certificate_notification_associations_ix', certificate_notification_associations.c.notification_id, certificate_notification_associations.c.certificate_id)
|
||||
|
||||
certificate_replacement_associations = db.Table('certificate_replacement_associations',
|
||||
Column('replaced_certificate_id', Integer,
|
||||
ForeignKey('certificates.id', ondelete='cascade')),
|
||||
@ -44,7 +53,26 @@ certificate_replacement_associations = db.Table('certificate_replacement_associa
|
||||
ForeignKey('certificates.id', ondelete='cascade'))
|
||||
)
|
||||
|
||||
Index('certificate_replacement_associations_ix', certificate_replacement_associations.c.certificate_id, certificate_replacement_associations.c.certificate_id)
|
||||
|
||||
roles_authorities = db.Table('roles_authorities',
|
||||
Column('authority_id', Integer, ForeignKey('authorities.id')),
|
||||
Column('role_id', Integer, ForeignKey('roles.id'))
|
||||
)
|
||||
|
||||
Index('roles_authorities_ix', roles_authorities.c.authority_id, roles_authorities.c.role_id)
|
||||
|
||||
roles_certificates = db.Table('roles_certificates',
|
||||
Column('certificate_id', Integer, ForeignKey('certificates.id')),
|
||||
Column('role_id', Integer, ForeignKey('roles.id'))
|
||||
)
|
||||
|
||||
Index('roles_certificates_ix', roles_certificates.c.certificate_id, roles_certificates.c.role_id)
|
||||
|
||||
|
||||
roles_users = db.Table('roles_users',
|
||||
Column('user_id', Integer, ForeignKey('users.id')),
|
||||
Column('role_id', Integer, ForeignKey('roles.id'))
|
||||
)
|
||||
|
||||
Index('roles_users_ix', roles_users.c.user_id, roles_users.c.role_id)
|
||||
|
@ -14,7 +14,7 @@ from sqlalchemy import Column, Integer, String, Text, ForeignKey
|
||||
|
||||
from lemur.database import db
|
||||
from lemur.utils import Vault
|
||||
from lemur.models import roles_users
|
||||
from lemur.models import roles_users, roles_authorities, roles_certificates
|
||||
|
||||
|
||||
class Role(db.Model):
|
||||
@ -25,5 +25,7 @@ class Role(db.Model):
|
||||
password = Column(Vault)
|
||||
description = Column(Text)
|
||||
authority_id = Column(Integer, ForeignKey('authorities.id'))
|
||||
authorities = relationship("Authority", secondary=roles_authorities, passive_deletes=True, backref="role", cascade='all,delete')
|
||||
user_id = Column(Integer, ForeignKey('users.id'))
|
||||
users = relationship("User", secondary=roles_users, viewonly=True, backref="role")
|
||||
certificates = relationship("Certificate", secondary=roles_certificates, backref="role")
|
||||
|
@ -69,8 +69,8 @@
|
||||
<li><a ui-sref="login">Login</a></li>
|
||||
</ul>
|
||||
<ul ng-show="currentUser.username" class="nav navbar-nav navbar-right">
|
||||
<li class="dropdown" dropdown on-toggle="toggled(open)">
|
||||
<a href class="dropdown-toggle profile-nav" dropdown-toggle>
|
||||
<li class="dropdown" uib-dropdown on-toggle="toggled(open)">
|
||||
<a href class="dropdown-toggle profile-nav" uib-dropdown-toggle>
|
||||
<span ng-if="currentUser.profileImage">
|
||||
{{ currentUser.username }}<img ng-src="{{ currentUser.profileImage }}" class="profile img-circle">
|
||||
</span>
|
||||
|
@ -7,6 +7,7 @@ from flask.ext.principal import identity_changed, Identity
|
||||
|
||||
from lemur import create_app
|
||||
from lemur.database import db as _db
|
||||
from lemur.auth.service import create_token
|
||||
|
||||
from .factories import AuthorityFactory, NotificationFactory, DestinationFactory, \
|
||||
CertificateFactory, UserFactory, RoleFactory
|
||||
@ -110,6 +111,15 @@ def role(session):
|
||||
return r
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def user(session):
|
||||
u = UserFactory()
|
||||
session.commit()
|
||||
user_token = create_token(u)
|
||||
token = {'Authorization': 'Basic ' + user_token}
|
||||
return {'user': u, 'token': token}
|
||||
|
||||
|
||||
@pytest.yield_fixture(scope="function")
|
||||
def logged_in_user(app, user):
|
||||
with app.test_request_context():
|
||||
|
@ -14,7 +14,7 @@ from lemur.notifications.models import Notification
|
||||
from lemur.users.models import User
|
||||
from lemur.roles.models import Role
|
||||
|
||||
from .vectors import INTERNAL_VALID_SAN_STR, PRIVATE_KEY_STR
|
||||
from .vectors import INTERNAL_VALID_LONG_STR, INTERNAL_VALID_SAN_STR, PRIVATE_KEY_STR
|
||||
|
||||
|
||||
class BaseFactory(SQLAlchemyModelFactory):
|
||||
@ -31,7 +31,7 @@ class AuthorityFactory(BaseFactory):
|
||||
name = Sequence(lambda n: 'authority{0}'.format(n))
|
||||
owner = 'joe@example.com'
|
||||
plugin_name = 'TheRing'
|
||||
body = INTERNAL_VALID_SAN_STR
|
||||
body = INTERNAL_VALID_LONG_STR
|
||||
|
||||
class Meta:
|
||||
"""Factory configuration."""
|
||||
@ -56,15 +56,8 @@ class CertificateFactory(BaseFactory):
|
||||
owner = 'joe@example.com'
|
||||
status = FuzzyChoice(['valid', 'revoked', 'unknown'])
|
||||
deleted = False
|
||||
bits = 2048
|
||||
issuer = 'Example'
|
||||
serial = FuzzyText(length=128)
|
||||
cn = 'test.example.com'
|
||||
description = FuzzyText(length=128)
|
||||
active = True
|
||||
san = 'true'
|
||||
not_before = FuzzyDate(date(2016, 1, 1), date(2020, 1, 1))
|
||||
not_after = FuzzyDate(date(2016, 1, 1), date(2020, 1, 1))
|
||||
date_created = FuzzyDate(date(2016, 1, 1), date(2020, 1, 1))
|
||||
|
||||
class Meta:
|
||||
@ -132,6 +125,15 @@ class CertificateFactory(BaseFactory):
|
||||
for domain in extracted:
|
||||
self.domains.append(domain)
|
||||
|
||||
@post_generation
|
||||
def roles(self, create, extracted, **kwargs):
|
||||
if not create:
|
||||
return
|
||||
|
||||
if extracted:
|
||||
for domain in extracted:
|
||||
self.roles.append(domain)
|
||||
|
||||
|
||||
class DestinationFactory(BaseFactory):
|
||||
"""Destination factory."""
|
||||
|
@ -25,6 +25,26 @@ def test_authority_input_schema(client, role):
|
||||
assert not errors
|
||||
|
||||
|
||||
@pytest.mark.parametrize("token, count", [
|
||||
(VALID_USER_HEADER_TOKEN, 0),
|
||||
(VALID_ADMIN_HEADER_TOKEN, 1)
|
||||
])
|
||||
def test_admin_authority(client, authority, token, count):
|
||||
assert client.get(api.url_for(AuthoritiesList), headers=token).json['total'] == count
|
||||
|
||||
|
||||
def test_user_authority(session, client, authority, role, user):
|
||||
assert client.get(api.url_for(AuthoritiesList), headers=user['token']).json['total'] == 0
|
||||
u = user['user']
|
||||
u.roles.append(role)
|
||||
authority.roles.append(role)
|
||||
session.commit()
|
||||
assert client.get(api.url_for(AuthoritiesList), headers=user['token']).json['total'] == 1
|
||||
u.roles.remove(role)
|
||||
session.commit()
|
||||
assert client.get(api.url_for(AuthoritiesList), headers=user['token']).json['total'] == 0
|
||||
|
||||
|
||||
@pytest.mark.parametrize("token,status", [
|
||||
(VALID_USER_HEADER_TOKEN, 404),
|
||||
(VALID_ADMIN_HEADER_TOKEN, 404),
|
||||
|
@ -186,8 +186,8 @@ def test_certificate_valid_dates(client, authority):
|
||||
'owner': 'jim@example.com',
|
||||
'authority': {'id': authority.id},
|
||||
'description': 'testtestest',
|
||||
'validityStart': '2017-04-30T00:12:34.513631',
|
||||
'validityEnd': '2018-04-30T00:12:34.513631'
|
||||
'validityStart': '2020-01-01T00:21:34.513631',
|
||||
'validityEnd': '2020-01-01T00:22:34.513631'
|
||||
}
|
||||
|
||||
data, errors = CertificateInputSchema().load(input_data)
|
||||
@ -293,79 +293,18 @@ def test_create_basic_csr(client):
|
||||
assert name.value in csr_config.values()
|
||||
|
||||
|
||||
def test_cert_get_cn(client):
|
||||
from .vectors import INTERNAL_VALID_LONG_CERT
|
||||
from lemur.certificates.models import get_cn
|
||||
|
||||
assert get_cn(INTERNAL_VALID_LONG_CERT) == 'long.lived.com'
|
||||
|
||||
|
||||
def test_cert_get_sub_alt_domains(client):
|
||||
from .vectors import INTERNAL_VALID_SAN_CERT, INTERNAL_VALID_LONG_CERT
|
||||
from lemur.certificates.models import get_domains
|
||||
|
||||
assert get_domains(INTERNAL_VALID_LONG_CERT) == []
|
||||
assert get_domains(INTERNAL_VALID_SAN_CERT) == ['example2.long.com', 'example3.long.com']
|
||||
|
||||
|
||||
def test_cert_is_san(client):
|
||||
from .vectors import INTERNAL_VALID_SAN_CERT, INTERNAL_VALID_LONG_CERT
|
||||
from lemur.certificates.models import is_san
|
||||
|
||||
assert not is_san(INTERNAL_VALID_LONG_CERT)
|
||||
assert is_san(INTERNAL_VALID_SAN_CERT)
|
||||
|
||||
|
||||
def test_cert_is_wildcard(client):
|
||||
from .vectors import INTERNAL_VALID_WILDCARD_CERT, INTERNAL_VALID_LONG_CERT
|
||||
from lemur.certificates.models import is_wildcard
|
||||
assert is_wildcard(INTERNAL_VALID_WILDCARD_CERT)
|
||||
assert not is_wildcard(INTERNAL_VALID_LONG_CERT)
|
||||
|
||||
|
||||
def test_cert_get_bitstrength(client):
|
||||
from .vectors import INTERNAL_VALID_LONG_CERT
|
||||
from lemur.certificates.models import get_bitstrength
|
||||
assert get_bitstrength(INTERNAL_VALID_LONG_CERT) == 2048
|
||||
|
||||
|
||||
def test_cert_get_issuer(client):
|
||||
from .vectors import INTERNAL_VALID_LONG_CERT
|
||||
from lemur.certificates.models import get_issuer
|
||||
assert get_issuer(INTERNAL_VALID_LONG_CERT) == 'Example'
|
||||
|
||||
|
||||
def test_get_name_from_arn(client):
|
||||
from lemur.certificates.models import get_name_from_arn
|
||||
from lemur.certificates.service import get_name_from_arn
|
||||
arn = 'arn:aws:iam::11111111:server-certificate/mycertificate'
|
||||
assert get_name_from_arn(arn) == 'mycertificate'
|
||||
|
||||
|
||||
def test_get_account_number(client):
|
||||
from lemur.certificates.models import get_account_number
|
||||
from lemur.certificates.service import get_account_number
|
||||
arn = 'arn:aws:iam::11111111:server-certificate/mycertificate'
|
||||
assert get_account_number(arn) == '11111111'
|
||||
|
||||
|
||||
def test_create_name(client):
|
||||
from lemur.certificates.models import create_name
|
||||
from datetime import datetime
|
||||
assert create_name(
|
||||
'Example Inc,',
|
||||
datetime(2015, 5, 7, 0, 0, 0),
|
||||
datetime(2015, 5, 12, 0, 0, 0),
|
||||
'example.com',
|
||||
False
|
||||
) == 'example.com-ExampleInc-20150507-20150512'
|
||||
assert create_name(
|
||||
'Example Inc,',
|
||||
datetime(2015, 5, 7, 0, 0, 0),
|
||||
datetime(2015, 5, 12, 0, 0, 0),
|
||||
'example.com',
|
||||
True
|
||||
) == 'SAN-example.com-ExampleInc-20150507-20150512'
|
||||
|
||||
|
||||
@pytest.mark.parametrize("token,status", [
|
||||
(VALID_USER_HEADER_TOKEN, 404),
|
||||
(VALID_ADMIN_HEADER_TOKEN, 404),
|
||||
|
61
lemur/tests/test_defaults.py
Normal file
61
lemur/tests/test_defaults.py
Normal file
@ -0,0 +1,61 @@
|
||||
|
||||
|
||||
def test_cert_get_cn(client):
|
||||
from .vectors import INTERNAL_VALID_LONG_CERT
|
||||
from lemur.common.defaults import common_name
|
||||
|
||||
assert common_name(INTERNAL_VALID_LONG_CERT) == 'long.lived.com'
|
||||
|
||||
|
||||
def test_cert_sub_alt_domains(client):
|
||||
from .vectors import INTERNAL_VALID_SAN_CERT, INTERNAL_VALID_LONG_CERT
|
||||
from lemur.common.defaults import domains
|
||||
|
||||
assert domains(INTERNAL_VALID_LONG_CERT) == []
|
||||
assert domains(INTERNAL_VALID_SAN_CERT) == ['example2.long.com', 'example3.long.com']
|
||||
|
||||
|
||||
def test_cert_is_san(client):
|
||||
from .vectors import INTERNAL_VALID_SAN_CERT, INTERNAL_VALID_LONG_CERT
|
||||
from lemur.common.defaults import san
|
||||
|
||||
assert not san(INTERNAL_VALID_LONG_CERT)
|
||||
assert san(INTERNAL_VALID_SAN_CERT)
|
||||
|
||||
|
||||
def test_cert_is_wildcard(client):
|
||||
from .vectors import INTERNAL_VALID_WILDCARD_CERT, INTERNAL_VALID_LONG_CERT
|
||||
from lemur.common.defaults import is_wildcard
|
||||
assert is_wildcard(INTERNAL_VALID_WILDCARD_CERT)
|
||||
assert not is_wildcard(INTERNAL_VALID_LONG_CERT)
|
||||
|
||||
|
||||
def test_cert_bitstrength(client):
|
||||
from .vectors import INTERNAL_VALID_LONG_CERT
|
||||
from lemur.common.defaults import bitstrength
|
||||
assert bitstrength(INTERNAL_VALID_LONG_CERT) == 2048
|
||||
|
||||
|
||||
def test_cert_issuer(client):
|
||||
from .vectors import INTERNAL_VALID_LONG_CERT
|
||||
from lemur.common.defaults import issuer
|
||||
assert issuer(INTERNAL_VALID_LONG_CERT) == 'Example'
|
||||
|
||||
|
||||
def test_create_name(client):
|
||||
from lemur.common.defaults import certificate_name
|
||||
from datetime import datetime
|
||||
assert certificate_name(
|
||||
'example.com',
|
||||
'Example Inc,',
|
||||
datetime(2015, 5, 7, 0, 0, 0),
|
||||
datetime(2015, 5, 12, 0, 0, 0),
|
||||
False
|
||||
) == 'example.com-ExampleInc-20150507-20150512'
|
||||
assert certificate_name(
|
||||
'example.com',
|
||||
'Example Inc,',
|
||||
datetime(2015, 5, 7, 0, 0, 0),
|
||||
datetime(2015, 5, 12, 0, 0, 0),
|
||||
True
|
||||
) == 'SAN-example.com-ExampleInc-20150507-20150512'
|
@ -1,6 +1,7 @@
|
||||
import pytest
|
||||
|
||||
from lemur.roles.views import * # noqa
|
||||
from lemur.tests.factories import RoleFactory, AuthorityFactory, CertificateFactory
|
||||
|
||||
|
||||
from .vectors import VALID_ADMIN_HEADER_TOKEN, VALID_USER_HEADER_TOKEN
|
||||
@ -18,6 +19,25 @@ def test_role_input_schema(client):
|
||||
assert not errors
|
||||
|
||||
|
||||
def test_multiple_authority_certificate_association(session, client):
|
||||
role = RoleFactory()
|
||||
authority = AuthorityFactory()
|
||||
certificate = CertificateFactory()
|
||||
authority1 = AuthorityFactory()
|
||||
certificate1 = CertificateFactory()
|
||||
|
||||
role.authorities.append(authority)
|
||||
role.authorities.append(authority1)
|
||||
role.certificates.append(certificate)
|
||||
role.certificates.append(certificate1)
|
||||
|
||||
session.commit()
|
||||
assert role.authorities[0].name == authority.name
|
||||
assert role.authorities[1].name == authority1.name
|
||||
assert role.certificates[0].name == certificate.name
|
||||
assert role.certificates[1].name == certificate1.name
|
||||
|
||||
|
||||
@pytest.mark.parametrize("token,status", [
|
||||
(VALID_USER_HEADER_TOKEN, 403),
|
||||
(VALID_ADMIN_HEADER_TOKEN, 200),
|
||||
|
Loading…
Reference in New Issue
Block a user