Orphaned certificates (#406)
* Fixing whitespace. * Fixing syncing. * Fixing tests
This commit is contained in:
parent
a644f45625
commit
29a330b1f4
|
@ -7,6 +7,7 @@
|
|||
"""
|
||||
import datetime
|
||||
|
||||
import lemur.common.utils
|
||||
from flask import current_app
|
||||
|
||||
from sqlalchemy.orm import relationship
|
||||
|
@ -38,7 +39,7 @@ class Certificate(db.Model):
|
|||
__tablename__ = 'certificates'
|
||||
id = Column(Integer, primary_key=True)
|
||||
owner = Column(String(128), nullable=False)
|
||||
name = Column(String(128)) # , unique=True) TODO make all names unique
|
||||
name = Column(String(128), unique=True)
|
||||
description = Column(String(1024))
|
||||
active = Column(Boolean, default=True)
|
||||
|
||||
|
@ -78,7 +79,7 @@ class Certificate(db.Model):
|
|||
endpoints = relationship("Endpoint", backref='certificate')
|
||||
|
||||
def __init__(self, **kwargs):
|
||||
cert = defaults.parse_certificate(kwargs['body'])
|
||||
cert = lemur.common.utils.parse_certificate(kwargs['body'])
|
||||
|
||||
self.issuer = defaults.issuer(cert)
|
||||
self.cn = defaults.common_name(cert)
|
||||
|
@ -88,14 +89,19 @@ class Certificate(db.Model):
|
|||
|
||||
# when destinations are appended they require a valid name.
|
||||
if kwargs.get('name'):
|
||||
self.name = kwargs['name']
|
||||
self.name = get_or_increase_name(kwargs['name'])
|
||||
else:
|
||||
self.name = get_or_increase_name(defaults.certificate_name(self.cn, self.issuer, self.not_before, self.not_after, self.san))
|
||||
|
||||
self.owner = kwargs['owner']
|
||||
self.body = kwargs['body']
|
||||
self.private_key = kwargs.get('private_key')
|
||||
self.chain = kwargs.get('chain')
|
||||
self.body = kwargs['body'].strip()
|
||||
|
||||
if kwargs.get('private_key'):
|
||||
self.private_key = kwargs['private_key'].strip()
|
||||
|
||||
if kwargs.get('chain'):
|
||||
self.chain = kwargs['chain'].strip()
|
||||
|
||||
self.destinations = kwargs.get('destinations', [])
|
||||
self.notifications = kwargs.get('notifications', [])
|
||||
self.description = kwargs.get('description')
|
||||
|
|
|
@ -77,16 +77,16 @@ def get_by_source(source_label):
|
|||
return Certificate.query.filter(Certificate.sources.any(label=source_label))
|
||||
|
||||
|
||||
def find_duplicates(cert_body):
|
||||
def find_duplicates(cert):
|
||||
"""
|
||||
Finds certificates that already exist within Lemur. We do this by looking for
|
||||
certificate bodies that are the same. This is the most reliable way to determine
|
||||
if a certificate is already being tracked by Lemur.
|
||||
|
||||
:param cert_body:
|
||||
:param cert:
|
||||
:return:
|
||||
"""
|
||||
return Certificate.query.filter_by(body=cert_body).all()
|
||||
return Certificate.query.filter_by(body=cert['body'].strip(), chain=cert['chain'].strip()).all()
|
||||
|
||||
|
||||
def export(cert, export_plugin):
|
||||
|
@ -172,14 +172,9 @@ def import_certificate(**kwargs):
|
|||
|
||||
:param kwargs:
|
||||
"""
|
||||
from lemur.users import service as user_service
|
||||
|
||||
if not kwargs.get('owner'):
|
||||
kwargs['owner'] = current_app.config.get('LEMUR_SECURITY_TEAM_EMAIL')[0]
|
||||
|
||||
if not kwargs.get('creator'):
|
||||
kwargs['creator'] = user_service.get_by_email('lemur@nobody')
|
||||
|
||||
return upload(**kwargs)
|
||||
|
||||
|
||||
|
@ -187,7 +182,6 @@ def upload(**kwargs):
|
|||
"""
|
||||
Allows for pre-made certificates to be imported into Lemur.
|
||||
"""
|
||||
from lemur.users import service as user_service
|
||||
roles = create_certificate_roles(**kwargs)
|
||||
|
||||
if kwargs.get('roles'):
|
||||
|
@ -202,8 +196,7 @@ def upload(**kwargs):
|
|||
try:
|
||||
g.user.certificates.append(cert)
|
||||
except AttributeError:
|
||||
user = user_service.get_by_email('lemur@nobody')
|
||||
user.certificates.append(cert)
|
||||
current_app.logger.debug("No user to associate uploaded certificate to.")
|
||||
|
||||
return database.update(cert)
|
||||
|
||||
|
|
|
@ -1,17 +1,8 @@
|
|||
import sys
|
||||
from flask import current_app
|
||||
from cryptography import x509
|
||||
from cryptography.hazmat.backends import default_backend
|
||||
from flask import current_app
|
||||
from lemur.constants import SAN_NAMING_TEMPLATE, DEFAULT_NAMING_TEMPLATE
|
||||
|
||||
|
||||
def parse_certificate(body):
|
||||
if sys.version_info >= (3, 0):
|
||||
return x509.load_pem_x509_certificate(body, default_backend())
|
||||
else:
|
||||
return x509.load_pem_x509_certificate(bytes(body), default_backend())
|
||||
|
||||
|
||||
def certificate_name(common_name, issuer, not_before, not_after, san):
|
||||
"""
|
||||
Create a name for our certificate. A naming standard
|
||||
|
|
|
@ -6,15 +6,22 @@
|
|||
|
||||
.. moduleauthor:: Kevin Glisson <kglisson@netflix.com>
|
||||
"""
|
||||
import sys
|
||||
import string
|
||||
import random
|
||||
from functools import wraps
|
||||
|
||||
from flask import current_app
|
||||
from cryptography import x509
|
||||
from cryptography.hazmat.backends import default_backend
|
||||
|
||||
from flask.ext.restful import marshal
|
||||
from flask.ext.restful.reqparse import RequestParser
|
||||
from flask.ext.sqlalchemy import Pagination
|
||||
|
||||
paginated_parser = RequestParser()
|
||||
|
||||
paginated_parser.add_argument('count', type=int, default=10, location='args')
|
||||
paginated_parser.add_argument('page', type=int, default=1, location='args')
|
||||
paginated_parser.add_argument('sortDir', type=str, dest='sort_dir', location='args')
|
||||
paginated_parser.add_argument('sortBy', type=str, dest='sort_by', location='args')
|
||||
paginated_parser.add_argument('filter', type=str, location='args')
|
||||
|
||||
|
||||
def get_psuedo_random_string():
|
||||
|
@ -28,51 +35,9 @@ def get_psuedo_random_string():
|
|||
return challenge
|
||||
|
||||
|
||||
class marshal_items(object):
|
||||
def __init__(self, fields, envelope=None):
|
||||
self.fields = fields
|
||||
self.envelop = envelope
|
||||
|
||||
def __call__(self, f):
|
||||
def _filter_items(items):
|
||||
filtered_items = []
|
||||
for item in items:
|
||||
filtered_items.append(marshal(item, self.fields))
|
||||
return filtered_items
|
||||
|
||||
@wraps(f)
|
||||
def wrapper(*args, **kwargs):
|
||||
try:
|
||||
resp = f(*args, **kwargs)
|
||||
|
||||
# this is a bit weird way to handle non standard error codes returned from the marshaled function
|
||||
if isinstance(resp, tuple):
|
||||
return resp[0], resp[1]
|
||||
|
||||
if isinstance(resp, Pagination):
|
||||
return {'items': _filter_items(resp.items), 'total': resp.total}
|
||||
|
||||
if isinstance(resp, list):
|
||||
return {'items': _filter_items(resp), 'total': len(resp)}
|
||||
|
||||
return marshal(resp, self.fields)
|
||||
except Exception as e:
|
||||
current_app.logger.exception(e)
|
||||
# this is a little weird hack to respect flask restful parsing errors on marshaled functions
|
||||
if hasattr(e, 'code'):
|
||||
if hasattr(e, 'data'):
|
||||
return {'message': e.data['message']}, 400
|
||||
else:
|
||||
return {'message': {'exception': 'unknown'}}, 400
|
||||
else:
|
||||
return {'message': {'exception': str(e)}}, 400
|
||||
return wrapper
|
||||
|
||||
|
||||
paginated_parser = RequestParser()
|
||||
|
||||
paginated_parser.add_argument('count', type=int, default=10, location='args')
|
||||
paginated_parser.add_argument('page', type=int, default=1, location='args')
|
||||
paginated_parser.add_argument('sortDir', type=str, dest='sort_dir', location='args')
|
||||
paginated_parser.add_argument('sortBy', type=str, dest='sort_by', location='args')
|
||||
paginated_parser.add_argument('filter', type=str, location='args')
|
||||
def parse_certificate(body):
|
||||
if sys.version_info >= (3, 0):
|
||||
if isinstance(body, bytes):
|
||||
return x509.load_pem_x509_certificate(body, default_backend())
|
||||
return x509.load_pem_x509_certificate(bytes(body, 'utf8'), default_backend())
|
||||
return x509.load_pem_x509_certificate(body, default_backend())
|
||||
|
|
|
@ -6,6 +6,7 @@ from cryptography import x509
|
|||
from cryptography.hazmat.backends import default_backend
|
||||
from cryptography.hazmat.primitives import serialization
|
||||
|
||||
from lemur.common.utils import parse_certificate
|
||||
from lemur.domains import service as domain_service
|
||||
from lemur.auth.permissions import SensitiveDomainPermission
|
||||
|
||||
|
@ -18,7 +19,7 @@ def public_certificate(body):
|
|||
:return:
|
||||
"""
|
||||
try:
|
||||
x509.load_pem_x509_certificate(bytes(body), default_backend())
|
||||
parse_certificate(body)
|
||||
except Exception:
|
||||
raise ValidationError('Public certificate presented is not valid.')
|
||||
|
||||
|
|
115
lemur/manage.py
115
lemur/manage.py
|
@ -16,8 +16,6 @@ from gunicorn.config import make_settings
|
|||
|
||||
from cryptography.fernet import Fernet
|
||||
|
||||
from lockfile import LockFile, LockTimeout
|
||||
|
||||
from flask import current_app
|
||||
from flask.ext.script import Manager, Command, Option, prompt_pass
|
||||
from flask.ext.migrate import Migrate, MigrateCommand, stamp
|
||||
|
@ -27,7 +25,6 @@ from lemur import database
|
|||
from lemur.users import service as user_service
|
||||
from lemur.roles import service as role_service
|
||||
from lemur.certificates import service as cert_service
|
||||
from lemur.sources import service as source_service
|
||||
from lemur.authorities import service as authority_service
|
||||
from lemur.notifications import service as notification_service
|
||||
|
||||
|
@ -36,7 +33,7 @@ from lemur.certificates.verify import verify_string
|
|||
|
||||
from lemur.plugins.lemur_aws import elb
|
||||
|
||||
from lemur.sources.service import sync as source_sync
|
||||
from lemur.sources import service as source_service
|
||||
|
||||
from lemur import create_app
|
||||
|
||||
|
@ -194,59 +191,6 @@ def generate_settings():
|
|||
return output
|
||||
|
||||
|
||||
@manager.option('-s', '--sources', dest='labels')
|
||||
@manager.option('-t', '--type', dest='type')
|
||||
def sync(labels, type):
|
||||
"""
|
||||
Attempts to run several methods Certificate discovery. This is
|
||||
run on a periodic basis and updates the Lemur datastore with the
|
||||
information it discovers.
|
||||
"""
|
||||
if not labels:
|
||||
sys.stdout.write("Active\tLabel\tDescription\n")
|
||||
for source in source_service.get_all():
|
||||
sys.stdout.write(
|
||||
"{active}\t{label}\t{description}!\n".format(
|
||||
label=source.label,
|
||||
description=source.description,
|
||||
active=source.active
|
||||
)
|
||||
)
|
||||
else:
|
||||
start_time = time.time()
|
||||
lock_file = "/tmp/.lemur_lock"
|
||||
sync_lock = LockFile(lock_file)
|
||||
|
||||
while not sync_lock.i_am_locking():
|
||||
try:
|
||||
sync_lock.acquire(timeout=2) # wait up to 10 seconds
|
||||
|
||||
sys.stdout.write("[+] Staring to sync sources: {labels}!\n".format(labels=labels))
|
||||
labels = labels.split(",")
|
||||
|
||||
if labels[0] == 'all':
|
||||
source_sync()
|
||||
else:
|
||||
source_sync(labels=labels, type=type)
|
||||
|
||||
sys.stdout.write(
|
||||
"[+] Finished syncing sources. Run Time: {time}\n".format(
|
||||
time=(time.time() - start_time)
|
||||
)
|
||||
)
|
||||
except LockTimeout:
|
||||
sys.stderr.write(
|
||||
"[!] Unable to acquire file lock on {file}, is there another sync running?\n".format(
|
||||
file=lock_file
|
||||
)
|
||||
)
|
||||
sync_lock.break_lock()
|
||||
sync_lock.acquire()
|
||||
sync_lock.release()
|
||||
|
||||
sync_lock.release()
|
||||
|
||||
|
||||
@manager.command
|
||||
def notify():
|
||||
"""
|
||||
|
@ -881,7 +825,6 @@ class Report(Command):
|
|||
|
||||
end = datetime.utcnow()
|
||||
start = end - timedelta(days=duration)
|
||||
|
||||
self.certificates_issued(name, start, end)
|
||||
|
||||
@staticmethod
|
||||
|
@ -941,6 +884,61 @@ class Report(Command):
|
|||
sys.stdout.write(tabulate(rows, headers=["Authority Name", "Description", "Daily Average", "Monthy Average", "Yearly Average"]) + "\n")
|
||||
|
||||
|
||||
class Sources(Command):
|
||||
"""
|
||||
Defines a set of actions to take against Lemur's sources.
|
||||
"""
|
||||
option_list = (
|
||||
Option('-s', '--sources', dest='sources', action='append', help='Sources to operate on.'),
|
||||
Option('-a', '--action', choices=['sync', 'clean'], dest='action', help='Action to take on source.')
|
||||
)
|
||||
|
||||
def run(self, sources, action):
|
||||
if not sources:
|
||||
table = []
|
||||
for source in source_service.get_all():
|
||||
table.append([source.label, source.active, source.description])
|
||||
|
||||
sys.stdout.write(tabulate(table, headers=['Label', 'Active', 'Description']))
|
||||
sys.exit(1)
|
||||
|
||||
for label in sources:
|
||||
source = source_service.get_by_label(label)
|
||||
|
||||
if not source:
|
||||
sys.stderr.write("Unable to find specified source with label: {0}".format(label))
|
||||
|
||||
if action == 'sync':
|
||||
self.sync(source)
|
||||
|
||||
if action == 'clean':
|
||||
self.clean(source)
|
||||
|
||||
@staticmethod
|
||||
def sync(source):
|
||||
start_time = time.time()
|
||||
sys.stdout.write("[+] Staring to sync source: {label}!\n".format(label=source.label))
|
||||
source_service.sync(source)
|
||||
sys.stdout.write(
|
||||
"[+] Finished syncing source: {label}. Run Time: {time}\n".format(
|
||||
label=source.label,
|
||||
time=(time.time() - start_time)
|
||||
)
|
||||
)
|
||||
|
||||
@staticmethod
|
||||
def clean(source):
|
||||
start_time = time.time()
|
||||
sys.stdout.write("[+] Staring to clean source: {label}!\n".format(label=source.label))
|
||||
source_service.clean(source)
|
||||
sys.stdout.write(
|
||||
"[+] Finished cleaning source: {label}. Run Time: {time}\n".format(
|
||||
label=source.label,
|
||||
time=(time.time() - start_time)
|
||||
)
|
||||
)
|
||||
|
||||
|
||||
def main():
|
||||
manager.add_command("start", LemurServer())
|
||||
manager.add_command("runserver", Server(host='127.0.0.1', threaded=True))
|
||||
|
@ -954,6 +952,7 @@ def main():
|
|||
manager.add_command("provision_elb", ProvisionELB())
|
||||
manager.add_command("rotate_elbs", RotateELBs())
|
||||
manager.add_command("rolling", Rolling())
|
||||
manager.add_command("sources", Sources())
|
||||
manager.add_command("report", Report())
|
||||
manager.run()
|
||||
|
||||
|
|
|
@ -0,0 +1,35 @@
|
|||
"""Ensures that certificate name is unique
|
||||
|
||||
Revision ID: 7f71c0cea31a
|
||||
Revises: 29d8c8455c86
|
||||
Create Date: 2016-07-28 09:39:12.736506
|
||||
|
||||
"""
|
||||
|
||||
# revision identifiers, used by Alembic.
|
||||
revision = '7f71c0cea31a'
|
||||
down_revision = '29d8c8455c86'
|
||||
|
||||
from alembic import op
|
||||
import sqlalchemy as sa
|
||||
from sqlalchemy.sql import text
|
||||
|
||||
|
||||
def upgrade():
|
||||
conn = op.get_bind()
|
||||
|
||||
for id, body, chain in conn.execute(text('select id, body, chain from certificates')):
|
||||
if body and chain:
|
||||
stmt = text('update certificates set body=:body, chain=:chain where id=:id')
|
||||
stmt = stmt.bindparams(body=body.strip(), chain=chain.strip(), id=id)
|
||||
else:
|
||||
stmt = text('update certificates set body=:body where id=:id')
|
||||
stmt = stmt.bindparams(body=body.strip(), id=id)
|
||||
|
||||
op.execute(stmt)
|
||||
|
||||
op.create_unique_constraint(None, 'certificates', ['name'])
|
||||
|
||||
|
||||
def downgrade():
|
||||
op.drop_constraint(None, 'certificates', type_='unique')
|
|
@ -25,6 +25,12 @@ class SourcePlugin(Plugin):
|
|||
def get_certificates(self):
|
||||
raise NotImplemented
|
||||
|
||||
def get_endpoints(self):
|
||||
raise NotImplemented
|
||||
|
||||
def clean(self):
|
||||
raise NotImplemented
|
||||
|
||||
@property
|
||||
def options(self):
|
||||
return list(self.default_options) + self.additional_options
|
||||
|
|
|
@ -33,15 +33,15 @@ def upload_cert(account_number, name, body, private_key, cert_chain=None):
|
|||
cert_chain=str(cert_chain))
|
||||
|
||||
|
||||
def delete_cert(account_number, cert):
|
||||
def delete_cert(account_number, cert_name):
|
||||
"""
|
||||
Delete a certificate from AWS
|
||||
|
||||
:param account_number:
|
||||
:param cert:
|
||||
:param cert_name:
|
||||
:return:
|
||||
"""
|
||||
return assume_service(account_number, 'iam').delete_server_cert(cert.name)
|
||||
return assume_service(account_number, 'iam').delete_server_cert(cert_name)
|
||||
|
||||
|
||||
def get_all_server_certs(account_number):
|
||||
|
|
|
@ -155,6 +155,22 @@ class AWSSourcePlugin(SourcePlugin):
|
|||
|
||||
return endpoints
|
||||
|
||||
def clean(self, options, **kwargs):
|
||||
account_number = self.get_option('accountNumber', options)
|
||||
certificates = self.get_certificates(options)
|
||||
endpoints = self.get_endpoints(options)
|
||||
|
||||
orphaned = []
|
||||
for certificate in certificates:
|
||||
for endpoint in endpoints:
|
||||
if certificate['name'] == endpoint['certificate_name']:
|
||||
break
|
||||
else:
|
||||
orphaned.append(certificate['name'])
|
||||
iam.delete_cert(account_number, certificate)
|
||||
|
||||
return orphaned
|
||||
|
||||
|
||||
def format_elb_cipher_policy(policy):
|
||||
"""
|
||||
|
|
|
@ -21,11 +21,11 @@ from lemur.plugins.base import plugins
|
|||
|
||||
|
||||
# TODO optimize via sql query
|
||||
def _disassociate_certs_from_source(found_certificates, source_label):
|
||||
current_certificates = cert_service.get_by_source(source_label=source_label)
|
||||
def _disassociate_certs_from_source(certificates, source):
|
||||
current_certificates = cert_service.get_by_source(source_label=source.label)
|
||||
missing = []
|
||||
for cc in current_certificates:
|
||||
for fc in found_certificates:
|
||||
for fc in certificates:
|
||||
if fc['body'] == cc.body:
|
||||
break
|
||||
else:
|
||||
|
@ -33,22 +33,22 @@ def _disassociate_certs_from_source(found_certificates, source_label):
|
|||
|
||||
for c in missing:
|
||||
for s in c.sources:
|
||||
if s.label == source_label:
|
||||
if s.label == source:
|
||||
current_app.logger.info(
|
||||
"Certificate {name} is no longer associated with {source}.".format(
|
||||
name=c.name,
|
||||
source=source_label
|
||||
source=source.label
|
||||
)
|
||||
)
|
||||
c.sources.delete(s)
|
||||
|
||||
|
||||
# TODO optimize via sql query
|
||||
def _disassociate_endpoints_from_source(found_endpoints, source_label):
|
||||
current_endpoints = endpoint_service.get_by_source(source_label=source_label)
|
||||
def _disassociate_endpoints_from_source(endpoints, source):
|
||||
current_endpoints = endpoint_service.get_by_source(source_label=source.label)
|
||||
|
||||
for ce in current_endpoints:
|
||||
for fe in found_endpoints:
|
||||
for fe in endpoints:
|
||||
if ce.dnsname == fe['dnsname']:
|
||||
break
|
||||
else:
|
||||
|
@ -108,6 +108,7 @@ def sync_endpoints(source):
|
|||
certificate = endpoint.pop('certificate', None)
|
||||
|
||||
if certificate_name:
|
||||
current_app.logger.debug(certificate_name)
|
||||
cert = cert_service.get_by_name(certificate_name)
|
||||
|
||||
elif certificate:
|
||||
|
@ -116,7 +117,8 @@ def sync_endpoints(source):
|
|||
cert = cert_service.import_certificate(**certificate)
|
||||
|
||||
if not cert:
|
||||
current_app.logger.error("Unable to find associated certificate, be sure that certificates are sync'ed before endpoints")
|
||||
current_app.logger.error(
|
||||
"Unable to find associated certificate, be sure that certificates are sync'ed before endpoints")
|
||||
continue
|
||||
|
||||
endpoint['certificate'] = cert
|
||||
|
@ -149,7 +151,7 @@ def sync_certificates(source):
|
|||
certificates = s.get_certificates(source.options)
|
||||
|
||||
for certificate in certificates:
|
||||
exists = cert_service.find_duplicates(certificate['body'])
|
||||
exists = cert_service.find_duplicates(certificate)
|
||||
|
||||
if not exists:
|
||||
certificate_create(certificate, source)
|
||||
|
@ -170,18 +172,7 @@ def sync_certificates(source):
|
|||
_disassociate_certs_from_source(certificates, source)
|
||||
|
||||
|
||||
def sync(labels=None, type=None):
|
||||
for source in database.get_all(Source, True, field='active'):
|
||||
# we should be able to specify, individual sources to sync
|
||||
if labels:
|
||||
if source.label not in labels:
|
||||
continue
|
||||
|
||||
if type == 'endpoints':
|
||||
sync_endpoints(source)
|
||||
elif type == 'certificates':
|
||||
sync_certificates(source)
|
||||
else:
|
||||
def sync(source):
|
||||
sync_certificates(source)
|
||||
sync_endpoints(source)
|
||||
|
||||
|
@ -189,6 +180,29 @@ def sync(labels=None, type=None):
|
|||
database.update(source)
|
||||
|
||||
|
||||
def clean(source):
|
||||
s = plugins.get(source.plugin_name)
|
||||
|
||||
try:
|
||||
certificates = s.clean(source.options)
|
||||
except NotImplemented:
|
||||
current_app.logger.warning("Cannot clean source: {0}, source plugin does not implement 'clean()'".format(
|
||||
source.label
|
||||
))
|
||||
return
|
||||
|
||||
for certificate in certificates:
|
||||
current_app.logger.debug(certificate)
|
||||
cert = cert_service.get_by_name(certificate)
|
||||
|
||||
if cert:
|
||||
current_app.logger.warning("Removed {0} from source {1} during cleaning".format(
|
||||
cert.name,
|
||||
source.label
|
||||
))
|
||||
cert.sources.remove(source)
|
||||
|
||||
|
||||
def create(label, plugin_name, options, description=None):
|
||||
"""
|
||||
Creates a new source, that can then be used as a source for certificates.
|
||||
|
|
|
@ -360,7 +360,7 @@ def test_upload(logged_in_user):
|
|||
assert cert.name == 'long.lived.com-Example-20150626-20400101-2'
|
||||
|
||||
cert = upload(body=INTERNAL_VALID_LONG_STR, chain=INTERNAL_VALID_SAN_STR, private_key=PRIVATE_KEY_STR, owner='joe@example.com', name='ACustomName')
|
||||
assert cert.name == 'ACustomName'
|
||||
assert 'ACustomName' in cert.name
|
||||
|
||||
|
||||
@pytest.mark.parametrize("token,status", [
|
||||
|
|
|
@ -1,5 +1,4 @@
|
|||
from cryptography import x509
|
||||
from cryptography.hazmat.backends import default_backend
|
||||
from lemur.common.utils import parse_certificate
|
||||
|
||||
VALID_USER_HEADER_TOKEN = {
|
||||
'Authorization': 'Basic ' + 'eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJpYXQiOjE0MzUyMzMzNjksInN1YiI6MSwiZXhwIjoxNTIxNTQ2OTY5fQ.1qCi0Ip7mzKbjNh0tVd3_eJOrae3rNa_9MCVdA4WtQI'}
|
||||
|
@ -34,7 +33,7 @@ Ygk1wptlt/tg7uUmstmXZA4vTPx83f4P3KSS3XHIYFIyGFWUDs23C20K6mmW1iXa
|
|||
h0S8LN4iv/+vNFPNiM1z9X/SZgfbwZXrLsSi
|
||||
-----END CERTIFICATE-----
|
||||
"""
|
||||
INTERNAL_VALID_LONG_CERT = x509.load_pem_x509_certificate(INTERNAL_VALID_LONG_STR, default_backend())
|
||||
INTERNAL_VALID_LONG_CERT = parse_certificate(INTERNAL_VALID_LONG_STR)
|
||||
|
||||
|
||||
INTERNAL_INVALID_STR = b"""
|
||||
|
@ -63,7 +62,7 @@ T7W3s8mm5bVHhQM7J9tV6dz/sVDmpOSuzL8oZkqeKP+lWU6ytaohFFpbdzaxWipU
|
|||
kP+oGWtHvhteUAe8Gloo5NchZJ0/BqlYRCD5aAHcmbXRsDid9mO4ADU=
|
||||
-----END CERTIFICATE-----
|
||||
"""
|
||||
INTERNAL_INVALID_CERT = x509.load_pem_x509_certificate(INTERNAL_INVALID_STR, default_backend())
|
||||
INTERNAL_INVALID_CERT = parse_certificate(INTERNAL_INVALID_STR)
|
||||
|
||||
|
||||
INTERNAL_VALID_SAN_STR = b"""
|
||||
|
@ -93,7 +92,7 @@ YBrY/duF15YpoMKAlFhDBh6R9/nb5kI2n3pY6I5h6LEYfLStazXbIu61M8zu9TM/
|
|||
+t5Oz6rmcjohL22+sEmmRz86dQZlrBBUxX0kCQj6OAFB4awtRd4fKtkCkZhvhQ==
|
||||
-----END CERTIFICATE-----
|
||||
"""
|
||||
INTERNAL_VALID_SAN_CERT = x509.load_pem_x509_certificate(INTERNAL_VALID_SAN_STR, default_backend())
|
||||
INTERNAL_VALID_SAN_CERT = parse_certificate(INTERNAL_VALID_SAN_STR)
|
||||
|
||||
|
||||
INTERNAL_VALID_WILDCARD_STR = b"""
|
||||
|
@ -122,7 +121,7 @@ UGniiUh4bAUuppbtSIvUTsRsJuPYOqHC3h8791JZ/3Sr5uB7QbCdz9K14c9zi6Z1
|
|||
S0Xb3ZauZJQI7OdHeUPDRVq+8hcG77sopN9pEYrIH08oxvLX2US3GqrowjOxthRa
|
||||
-----END CERTIFICATE-----
|
||||
"""
|
||||
INTERNAL_VALID_WILDCARD_CERT = x509.load_pem_x509_certificate(INTERNAL_VALID_WILDCARD_STR, default_backend())
|
||||
INTERNAL_VALID_WILDCARD_CERT = parse_certificate(INTERNAL_VALID_WILDCARD_STR)
|
||||
|
||||
|
||||
EXTERNAL_VALID_STR = b"""
|
||||
|
@ -157,7 +156,7 @@ Bs63gULVCqWygt5KEbv990m/XGuRMaXuHzHCHB4v5LRM30FiFmqCzyD8d+btzW9B
|
|||
1hZ5s3rj+a6UwvpinKJoPfgkgg==
|
||||
-----END CERTIFICATE-----
|
||||
"""
|
||||
EXTERNAL_CERT = x509.load_pem_x509_certificate(EXTERNAL_VALID_STR, default_backend())
|
||||
EXTERNAL_CERT = parse_certificate(EXTERNAL_VALID_STR)
|
||||
|
||||
|
||||
PRIVATE_KEY_STR = b"""
|
||||
|
|
Loading…
Reference in New Issue