Fixing a few things, adding tests. (#326)
This commit is contained in:
parent
615df76dd5
commit
e04c1e7dc9
|
@ -157,8 +157,9 @@ def get_authority_role(ca_name):
|
|||
else:
|
||||
for role in g.current_user.roles:
|
||||
if role.authority:
|
||||
if role.authority.name == ca_name:
|
||||
return role
|
||||
for authority in role.authorities:
|
||||
if authority.name == ca_name:
|
||||
return role
|
||||
|
||||
|
||||
def render(args):
|
||||
|
|
|
@ -7,6 +7,8 @@
|
|||
"""
|
||||
import datetime
|
||||
|
||||
from flask import current_app
|
||||
|
||||
from sqlalchemy import event, Integer, ForeignKey, String, DateTime, PassiveDefault, func, Column, Text, Boolean
|
||||
from sqlalchemy.orm import relationship
|
||||
|
||||
|
@ -22,11 +24,9 @@ from lemur.domains.models import Domain
|
|||
|
||||
|
||||
def get_or_increase_name(name):
|
||||
count = Certificate.query.filter(Certificate.name == name).count()
|
||||
count = Certificate.query.filter(Certificate.name.ilike('{0}%'.format(name))).count()
|
||||
|
||||
if count == 1:
|
||||
return name + '-1'
|
||||
elif count > 1:
|
||||
if count >= 1:
|
||||
return name + '-' + str(count)
|
||||
|
||||
return name
|
||||
|
@ -77,6 +77,11 @@ class Certificate(db.Model):
|
|||
self.body = kwargs['body']
|
||||
self.private_key = kwargs.get('private_key')
|
||||
self.chain = kwargs.get('chain')
|
||||
self.destinations = kwargs.get('destinations', [])
|
||||
self.notifications = kwargs.get('notifications', [])
|
||||
self.description = kwargs.get('description')
|
||||
self.roles = kwargs.get('roles', [])
|
||||
self.replaces = kwargs.get('replacements', [])
|
||||
self.signing_algorithm = defaults.signing_algorithm(cert)
|
||||
self.bits = defaults.bitstrength(cert)
|
||||
self.issuer = defaults.issuer(cert)
|
||||
|
@ -129,7 +134,10 @@ def update_destinations(target, value, initiator):
|
|||
:return:
|
||||
"""
|
||||
destination_plugin = plugins.get(value.plugin_name)
|
||||
destination_plugin.upload(target.name, target.body, target.private_key, target.chain, value.options)
|
||||
try:
|
||||
destination_plugin.upload(target.name, target.body, target.private_key, target.chain, value.options)
|
||||
except Exception as e:
|
||||
current_app.logger.exception(e)
|
||||
|
||||
|
||||
@event.listens_for(Certificate.replaces, 'append')
|
||||
|
|
|
@ -7,7 +7,7 @@
|
|||
"""
|
||||
from flask import current_app
|
||||
|
||||
from marshmallow import fields, validates_schema
|
||||
from marshmallow import fields, validates_schema, post_load
|
||||
from marshmallow.exceptions import ValidationError
|
||||
|
||||
from lemur.schemas import AssociatedAuthoritySchema, AssociatedDestinationSchema, AssociatedCertificateSchema, \
|
||||
|
@ -21,12 +21,26 @@ from lemur.users.schemas import UserNestedOutputSchema
|
|||
|
||||
from lemur.common.schema import LemurInputSchema, LemurOutputSchema
|
||||
from lemur.common import validators
|
||||
from lemur.notifications import service as notification_service
|
||||
|
||||
|
||||
class CertificateInputSchema(LemurInputSchema):
|
||||
name = fields.String()
|
||||
class CertificateSchema(LemurInputSchema):
|
||||
owner = fields.Email(required=True)
|
||||
description = fields.String()
|
||||
|
||||
@post_load
|
||||
def default_notifications(self, data):
|
||||
if not data['notifications']:
|
||||
notification_name = "DEFAULT_{0}".format(data['owner'].split('@')[0].upper())
|
||||
data['notifications'] += notification_service.create_default_expiration_notifications(notification_name, [data['owner']])
|
||||
|
||||
notification_name = 'DEFAULT_SECURITY'
|
||||
data['notifications'] += notification_service.create_default_expiration_notifications(notification_name, current_app.config.get('LEMUR_SECURITY_TEAM_EMAIL'))
|
||||
return data
|
||||
|
||||
|
||||
class CertificateInputSchema(CertificateSchema):
|
||||
name = fields.String()
|
||||
common_name = fields.String(required=True, validate=validators.sensitive_domain)
|
||||
authority = fields.Nested(AssociatedAuthoritySchema, required=True)
|
||||
|
||||
|
@ -54,9 +68,7 @@ class CertificateInputSchema(LemurInputSchema):
|
|||
validators.dates(data)
|
||||
|
||||
|
||||
class CertificateEditInputSchema(LemurInputSchema):
|
||||
owner = fields.Email(required=True)
|
||||
description = fields.String()
|
||||
class CertificateEditInputSchema(CertificateSchema):
|
||||
active = fields.Boolean()
|
||||
destinations = fields.Nested(AssociatedDestinationSchema, missing=[], many=True)
|
||||
notifications = fields.Nested(AssociatedNotificationSchema, missing=[], many=True)
|
||||
|
@ -107,14 +119,12 @@ class CertificateOutputSchema(LemurOutputSchema):
|
|||
authority = fields.Nested(AuthorityNestedOutputSchema)
|
||||
|
||||
|
||||
class CertificateUploadInputSchema(LemurInputSchema):
|
||||
class CertificateUploadInputSchema(CertificateSchema):
|
||||
name = fields.String()
|
||||
owner = fields.Email(required=True)
|
||||
description = fields.String()
|
||||
active = fields.Boolean(missing=True)
|
||||
|
||||
private_key = fields.String(validate=validators.private_key)
|
||||
public_cert = fields.String(required=True, validate=validators.public_certificate)
|
||||
body = fields.String(required=True, validate=validators.public_certificate)
|
||||
chain = fields.String(validate=validators.public_certificate) # TODO this could be multiple certificates
|
||||
|
||||
destinations = fields.Nested(AssociatedDestinationSchema, missing=[], many=True)
|
||||
|
|
|
@ -103,59 +103,36 @@ def update(cert_id, owner, description, active, destinations, notifications, rep
|
|||
:param replaces:
|
||||
:return:
|
||||
"""
|
||||
from lemur.notifications import service as notification_service
|
||||
cert = get(cert_id)
|
||||
cert.active = active
|
||||
cert.description = description
|
||||
|
||||
# we might have to create new notifications if the owner changes
|
||||
new_notifications = []
|
||||
# get existing names to remove
|
||||
notification_name = "DEFAULT_{0}".format(cert.owner.split('@')[0].upper())
|
||||
for n in notifications:
|
||||
if notification_name not in n.label:
|
||||
new_notifications.append(n)
|
||||
|
||||
notification_name = "DEFAULT_{0}".format(owner.split('@')[0].upper())
|
||||
new_notifications += notification_service.create_default_expiration_notifications(notification_name, owner)
|
||||
|
||||
cert.notifications = new_notifications
|
||||
|
||||
database.update_list(cert, 'destinations', Destination, destinations)
|
||||
database.update_list(cert, 'replaces', Certificate, replaces)
|
||||
|
||||
cert.destinations = destinations
|
||||
cert.replaces = replaces
|
||||
cert.owner = owner
|
||||
|
||||
return database.update(cert)
|
||||
|
||||
|
||||
def mint(issuer_options):
|
||||
def mint(**kwargs):
|
||||
"""
|
||||
Minting is slightly different for each authority.
|
||||
Support for multiple authorities is handled by individual plugins.
|
||||
|
||||
:param issuer_options:
|
||||
"""
|
||||
authority = issuer_options['authority']
|
||||
authority = kwargs['authority']
|
||||
|
||||
issuer = plugins.get(authority.plugin_name)
|
||||
|
||||
# allow the CSR to be specified by the user
|
||||
if not issuer_options.get('csr'):
|
||||
csr, private_key = create_csr(issuer_options)
|
||||
if not kwargs.get('csr'):
|
||||
csr, private_key = create_csr(**kwargs)
|
||||
else:
|
||||
csr = str(issuer_options.get('csr'))
|
||||
csr = str(kwargs.get('csr'))
|
||||
private_key = None
|
||||
|
||||
issuer_options['creator'] = g.user.email
|
||||
cert_body, cert_chain = issuer.create_certificate(csr, issuer_options)
|
||||
|
||||
cert = Certificate(cert_body, private_key, cert_chain)
|
||||
|
||||
cert.user = g.user
|
||||
cert.authority = authority
|
||||
database.update(cert)
|
||||
return cert, private_key, cert_chain,
|
||||
cert_body, cert_chain = issuer.create_certificate(csr, kwargs)
|
||||
return cert_body, private_key, cert_chain,
|
||||
|
||||
|
||||
def import_certificate(**kwargs):
|
||||
|
@ -172,69 +149,29 @@ def import_certificate(**kwargs):
|
|||
:param kwargs:
|
||||
"""
|
||||
from lemur.users import service as user_service
|
||||
from lemur.notifications import service as notification_service
|
||||
cert = Certificate(kwargs['public_certificate'], chain=kwargs['intermediate_certificate'])
|
||||
|
||||
# TODO future source plugins might have a better understanding of who the 'owner' is we should support this
|
||||
cert.owner = kwargs.get('owner', current_app.config.get('LEMUR_SECURITY_TEAM_EMAIL')[0])
|
||||
cert.creator = kwargs.get('creator', user_service.get_by_email('lemur@nobody'))
|
||||
if not kwargs.get('owner'):
|
||||
kwargs['owner'] = current_app.config.get('LEMUR_SECURITY_TEAM_EMAIL')[0]
|
||||
|
||||
# NOTE existing certs may not follow our naming standard we will
|
||||
# overwrite the generated name with the actual cert name
|
||||
if kwargs.get('name'):
|
||||
cert.name = kwargs.get('name')
|
||||
if not kwargs.get('creator'):
|
||||
kwargs['creator'] = user_service.get_by_email('lemur@nobody')
|
||||
|
||||
if kwargs.get('user'):
|
||||
cert.user = kwargs.get('user')
|
||||
|
||||
notification_name = 'DEFAULT_SECURITY'
|
||||
notifications = notification_service.create_default_expiration_notifications(notification_name, current_app.config.get('LEMUR_SECURITY_TEAM_EMAIL'))
|
||||
|
||||
if kwargs.get('replacements'):
|
||||
database.update_list(cert, 'replaces', Certificate, kwargs['replacements'])
|
||||
|
||||
cert.notifications = notifications
|
||||
|
||||
cert = database.create(cert)
|
||||
return cert
|
||||
return upload(**kwargs)
|
||||
|
||||
|
||||
def upload(**kwargs):
|
||||
"""
|
||||
Allows for pre-made certificates to be imported into Lemur.
|
||||
"""
|
||||
from lemur.notifications import service as notification_service
|
||||
cert = Certificate(
|
||||
kwargs.get('public_cert'),
|
||||
kwargs.get('private_key'),
|
||||
kwargs.get('intermediate_cert'),
|
||||
)
|
||||
cert = Certificate(**kwargs)
|
||||
|
||||
# we override the generated name if one is provided
|
||||
if kwargs.get('name'):
|
||||
cert.name = kwargs['name']
|
||||
|
||||
cert.description = kwargs.get('description')
|
||||
|
||||
cert.owner = kwargs['owner']
|
||||
cert = database.create(cert)
|
||||
|
||||
g.user.certificates.append(cert)
|
||||
|
||||
database.update_list(cert, 'destinations', Destination, kwargs['destinations'])
|
||||
database.update_list(cert, 'notifications', Notification, kwargs['notifications'])
|
||||
database.update_list(cert, 'replaces', Certificate, kwargs['replacements'])
|
||||
|
||||
# create default notifications for this certificate if none are provided
|
||||
notifications = []
|
||||
if not kwargs.get('notifications'):
|
||||
notification_name = "DEFAULT_{0}".format(cert.owner.split('@')[0].upper())
|
||||
notifications += notification_service.create_default_expiration_notifications(notification_name, [cert.owner])
|
||||
|
||||
notification_name = 'DEFAULT_SECURITY'
|
||||
notifications += notification_service.create_default_expiration_notifications(notification_name, current_app.config.get('LEMUR_SECURITY_TEAM_EMAIL'))
|
||||
cert.notifications = notifications
|
||||
|
||||
database.update(cert)
|
||||
return cert
|
||||
|
||||
|
@ -243,38 +180,21 @@ def create(**kwargs):
|
|||
"""
|
||||
Creates a new certificate.
|
||||
"""
|
||||
from lemur.notifications import service as notification_service
|
||||
cert, private_key, cert_chain = mint(kwargs)
|
||||
kwargs['creator'] = g.user.email
|
||||
cert_body, private_key, cert_chain = mint(**kwargs)
|
||||
kwargs['body'] = cert_body
|
||||
kwargs['private_key'] = private_key
|
||||
kwargs['chain'] = cert_chain
|
||||
|
||||
cert.owner = kwargs['owner']
|
||||
cert = Certificate(**kwargs)
|
||||
|
||||
# we override the generated name if one is provided
|
||||
if kwargs.get('name'):
|
||||
cert.name = kwargs['name']
|
||||
|
||||
database.create(cert)
|
||||
cert.description = kwargs.get('description')
|
||||
g.user.certificates.append(cert)
|
||||
database.update(g.user)
|
||||
database.commit()
|
||||
|
||||
# do this after the certificate has already been created because if it fails to upload to the third party
|
||||
# we do not want to lose the certificate information.
|
||||
database.update_list(cert, 'destinations', Destination, kwargs['destinations'])
|
||||
database.update_list(cert, 'replaces', Certificate, kwargs['replacements'])
|
||||
database.update_list(cert, 'notifications', Notification, kwargs['notifications'])
|
||||
|
||||
# create default notifications for this certificate if none are provided
|
||||
notifications = cert.notifications
|
||||
if not kwargs.get('notifications'):
|
||||
notification_name = "DEFAULT_{0}".format(cert.owner.split('@')[0].upper())
|
||||
notifications += notification_service.create_default_expiration_notifications(notification_name, [cert.owner])
|
||||
|
||||
notification_name = 'DEFAULT_SECURITY'
|
||||
notifications += notification_service.create_default_expiration_notifications(notification_name,
|
||||
current_app.config.get('LEMUR_SECURITY_TEAM_EMAIL'))
|
||||
cert.notifications = notifications
|
||||
|
||||
database.update(cert)
|
||||
metrics.send('certificate_issued', 'counter', 1, metric_tags=dict(owner=cert.owner, issuer=cert.issuer))
|
||||
return cert
|
||||
|
||||
|
@ -351,7 +271,7 @@ def render(args):
|
|||
return database.sort_and_page(query, Certificate, args)
|
||||
|
||||
|
||||
def create_csr(csr_config):
|
||||
def create_csr(**csr_config):
|
||||
"""
|
||||
Given a list of domains create the appropriate csr
|
||||
for those domains
|
||||
|
@ -440,7 +360,7 @@ def create_csr(csr_config):
|
|||
)
|
||||
|
||||
# serialize our private key and CSR
|
||||
pem = private_key.private_bytes(
|
||||
private_key = private_key.private_bytes(
|
||||
encoding=serialization.Encoding.PEM,
|
||||
format=serialization.PrivateFormat.TraditionalOpenSSL, # would like to use PKCS8 but AWS ELBs don't like it
|
||||
encryption_algorithm=serialization.NoEncryption()
|
||||
|
@ -450,7 +370,7 @@ def create_csr(csr_config):
|
|||
encoding=serialization.Encoding.PEM
|
||||
)
|
||||
|
||||
return csr, pem
|
||||
return csr, private_key
|
||||
|
||||
|
||||
def stats(**kwargs):
|
||||
|
|
|
@ -45,7 +45,7 @@
|
|||
</label>
|
||||
|
||||
<div class="col-sm-10">
|
||||
<textarea name="publicCert" ng-model="certificate.publicCert" placeholder="PEM encoded string..."
|
||||
<textarea name="publicCert" ng-model="certificate.body" placeholder="PEM encoded string..."
|
||||
class="form-control" ng-pattern="/^-----BEGIN CERTIFICATE-----/" required></textarea>
|
||||
|
||||
<p ng-show="uploadForm.publicCert.$invalid && !uploadForm.publicCert.$pristine" class="help-block">Enter
|
||||
|
@ -73,7 +73,7 @@
|
|||
</label>
|
||||
|
||||
<div class="col-sm-10">
|
||||
<textarea name="intermediateCert" ng-model="certificate.intermediateCert"
|
||||
<textarea name="intermediateCert" ng-model="certificate.chain"
|
||||
placeholder="PEM encoded string..." class="form-control"
|
||||
ng-pattern="/^-----BEGIN CERTIFICATE-----/"></textarea>
|
||||
|
||||
|
|
|
@ -31,7 +31,7 @@ LEMUR_RESTRICTED_DOMAINS = []
|
|||
# Lemur currently only supports SES for sending email, this address
|
||||
# needs to be verified
|
||||
LEMUR_EMAIL = ''
|
||||
LEMUR_SECURITY_TEAM_EMAIL = []
|
||||
LEMUR_SECURITY_TEAM_EMAIL = ['security@example.com']
|
||||
|
||||
# Logging
|
||||
|
||||
|
|
|
@ -58,6 +58,7 @@ def db(app, request):
|
|||
|
||||
_db.session.commit()
|
||||
yield _db
|
||||
_db.drop_all()
|
||||
|
||||
|
||||
@pytest.yield_fixture(scope="function")
|
||||
|
@ -120,15 +121,33 @@ def user(session):
|
|||
return {'user': u, 'token': token}
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def admin_user(session):
|
||||
u = UserFactory()
|
||||
admin_role = RoleFactory(name='admin')
|
||||
u.roles.append(admin_role)
|
||||
session.commit()
|
||||
user_token = create_token(u)
|
||||
token = {'Authorization': 'Basic ' + user_token}
|
||||
return {'user': u, 'token': token}
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def issuer_plugin():
|
||||
from lemur.plugins.base import register
|
||||
from .plugins.issuer_plugin import TestIssuerPlugin
|
||||
register(TestIssuerPlugin)
|
||||
|
||||
|
||||
@pytest.yield_fixture(scope="function")
|
||||
def logged_in_user(app, user):
|
||||
def logged_in_user(app):
|
||||
with app.test_request_context():
|
||||
identity_changed.send(current_app._get_current_object(), identity=Identity(user.id))
|
||||
identity_changed.send(current_app._get_current_object(), identity=Identity(1))
|
||||
yield
|
||||
|
||||
|
||||
@pytest.yield_fixture(scope="function")
|
||||
def logged_in_admin(app, admin_user):
|
||||
def logged_in_admin(app):
|
||||
with app.test_request_context():
|
||||
identity_changed.send(current_app._get_current_object(), identity=Identity(admin_user.id))
|
||||
identity_changed.send(current_app._get_current_object(), identity=Identity(2))
|
||||
yield
|
||||
|
|
|
@ -30,7 +30,7 @@ class AuthorityFactory(BaseFactory):
|
|||
"""Authority factory."""
|
||||
name = Sequence(lambda n: 'authority{0}'.format(n))
|
||||
owner = 'joe@example.com'
|
||||
plugin_name = 'TheRing'
|
||||
plugin_name = 'test-issuer'
|
||||
body = INTERNAL_VALID_LONG_STR
|
||||
|
||||
class Meta:
|
||||
|
|
|
@ -0,0 +1,23 @@
|
|||
from lemur.plugins.bases import IssuerPlugin
|
||||
|
||||
from lemur.tests.vectors import INTERNAL_VALID_SAN_STR, INTERNAL_VALID_LONG_STR
|
||||
|
||||
|
||||
class TestIssuerPlugin(IssuerPlugin):
|
||||
title = 'Test'
|
||||
slug = 'test-issuer'
|
||||
description = 'Enables testing'
|
||||
|
||||
author = 'Kevin Glisson'
|
||||
author_url = 'https://github.com/netflix/lemur.git'
|
||||
|
||||
def __init__(self, *args, **kwargs):
|
||||
super(TestIssuerPlugin, self).__init__(*args, **kwargs)
|
||||
|
||||
def create_certificate(self, csr, issuer_options):
|
||||
return INTERNAL_VALID_LONG_STR, INTERNAL_VALID_SAN_STR
|
||||
|
||||
@staticmethod
|
||||
def create_authority(options):
|
||||
role = {'username': '', 'password': '', 'name': 'test'}
|
||||
return INTERNAL_VALID_SAN_STR, "", [role]
|
|
@ -5,7 +5,8 @@ import json
|
|||
|
||||
from lemur.certificates.views import * # noqa
|
||||
|
||||
from .vectors import VALID_ADMIN_HEADER_TOKEN, VALID_USER_HEADER_TOKEN
|
||||
from .vectors import VALID_ADMIN_HEADER_TOKEN, VALID_USER_HEADER_TOKEN, CSR_STR, \
|
||||
INTERNAL_VALID_LONG_STR, INTERNAL_VALID_SAN_STR, PRIVATE_KEY_STR
|
||||
|
||||
|
||||
def test_authority_identifier_schema():
|
||||
|
@ -285,7 +286,7 @@ def test_create_basic_csr(client):
|
|||
location='A place',
|
||||
extensions=dict(names=dict(sub_alt_names=['test.example.com', 'test2.example.com']))
|
||||
)
|
||||
csr, pem = create_csr(csr_config)
|
||||
csr, pem = create_csr(**csr_config)
|
||||
|
||||
private_key = serialization.load_pem_private_key(pem, password=None, backend=default_backend())
|
||||
csr = x509.load_pem_x509_csr(csr, default_backend())
|
||||
|
@ -305,9 +306,66 @@ def test_get_account_number(client):
|
|||
assert get_account_number(arn) == '11111111'
|
||||
|
||||
|
||||
def test_mint_certificate(issuer_plugin, authority, logged_in_admin):
|
||||
from lemur.certificates.service import mint
|
||||
cert_body, private_key, chain = mint(authority=authority, csr=CSR_STR)
|
||||
assert cert_body == INTERNAL_VALID_LONG_STR, INTERNAL_VALID_SAN_STR
|
||||
|
||||
|
||||
def test_create_certificate(issuer_plugin, authority, logged_in_admin):
|
||||
from lemur.certificates.service import create
|
||||
cert = create(authority=authority, csr=CSR_STR, owner='joe@example.com')
|
||||
assert str(cert.not_after) == '2040-01-01 20:30:52'
|
||||
assert str(cert.not_before) == '2015-06-26 20:30:52'
|
||||
assert cert.issuer == 'Example'
|
||||
assert cert.name == 'long.lived.com-Example-20150626-20400101'
|
||||
|
||||
cert = create(authority=authority, csr=CSR_STR, owner='joe@example.com', name='ACustomName1')
|
||||
assert cert.name == 'ACustomName1'
|
||||
|
||||
|
||||
def test_create_csr():
|
||||
from lemur.certificates.service import create_csr
|
||||
|
||||
csr, private_key = create_csr(common_name='ACommonName', organization='test', organizational_unit='Meters', country='US',
|
||||
state='CA', location='Here')
|
||||
assert csr
|
||||
assert private_key
|
||||
|
||||
extensions = {'sub_alt_names': {'names': [{'name_type': 'DNSName', 'value': 'AnotherCommonName'}]}}
|
||||
csr, private_key = create_csr(common_name='ACommonName', organization='test', organizational_unit='Meters', country='US',
|
||||
state='CA', location='Here', extensions=extensions)
|
||||
assert csr
|
||||
assert private_key
|
||||
|
||||
|
||||
def test_import(logged_in_user):
|
||||
from lemur.certificates.service import import_certificate
|
||||
cert = import_certificate(body=INTERNAL_VALID_LONG_STR, chain=INTERNAL_VALID_SAN_STR, private_key=PRIVATE_KEY_STR)
|
||||
assert str(cert.not_after) == '2040-01-01 20:30:52'
|
||||
assert str(cert.not_before) == '2015-06-26 20:30:52'
|
||||
assert cert.issuer == 'Example'
|
||||
assert cert.name == 'long.lived.com-Example-20150626-20400101-1'
|
||||
|
||||
cert = import_certificate(body=INTERNAL_VALID_LONG_STR, chain=INTERNAL_VALID_SAN_STR, private_key=PRIVATE_KEY_STR, owner='joe@example.com', name='ACustomName2')
|
||||
assert cert.name == 'ACustomName2'
|
||||
|
||||
|
||||
def test_upload(logged_in_user):
|
||||
from lemur.certificates.service import upload
|
||||
cert = upload(body=INTERNAL_VALID_LONG_STR, chain=INTERNAL_VALID_SAN_STR, private_key=PRIVATE_KEY_STR, owner='joe@example.com')
|
||||
assert str(cert.not_after) == '2040-01-01 20:30:52'
|
||||
assert str(cert.not_before) == '2015-06-26 20:30:52'
|
||||
assert cert.issuer == 'Example'
|
||||
assert cert.name == 'long.lived.com-Example-20150626-20400101-2'
|
||||
|
||||
cert = upload(body=INTERNAL_VALID_LONG_STR, chain=INTERNAL_VALID_SAN_STR, private_key=PRIVATE_KEY_STR, owner='joe@example.com', name='ACustomName')
|
||||
assert cert.name == 'ACustomName'
|
||||
|
||||
|
||||
@pytest.mark.parametrize("token,status", [
|
||||
(VALID_USER_HEADER_TOKEN, 404),
|
||||
(VALID_ADMIN_HEADER_TOKEN, 404),
|
||||
(VALID_USER_HEADER_TOKEN, 200),
|
||||
(VALID_ADMIN_HEADER_TOKEN, 200),
|
||||
('', 401)
|
||||
])
|
||||
def test_certificate_get(client, token, status):
|
||||
|
@ -396,8 +454,8 @@ def test_certificates_patch(client, token, status):
|
|||
|
||||
|
||||
@pytest.mark.parametrize("token,status", [
|
||||
(VALID_USER_HEADER_TOKEN, 404),
|
||||
(VALID_ADMIN_HEADER_TOKEN, 404),
|
||||
(VALID_USER_HEADER_TOKEN, 403),
|
||||
(VALID_ADMIN_HEADER_TOKEN, 200),
|
||||
('', 401)
|
||||
])
|
||||
def test_certificate_credentials_get(client, token, status):
|
||||
|
|
|
@ -25,8 +25,8 @@ def test_notification_input_schema(client, notification):
|
|||
|
||||
|
||||
@pytest.mark.parametrize("token,status", [
|
||||
(VALID_USER_HEADER_TOKEN, 404),
|
||||
(VALID_ADMIN_HEADER_TOKEN, 404),
|
||||
(VALID_USER_HEADER_TOKEN, 200),
|
||||
(VALID_ADMIN_HEADER_TOKEN, 200),
|
||||
('', 401)
|
||||
])
|
||||
def test_notification_get(client, token, status):
|
||||
|
|
|
@ -189,3 +189,56 @@ t5Gpocpt77LJnNiszXSerj/KjX2MflY5xUXeekWowLVTBOK5+CZ8+XBIgBt1hIG3
|
|||
XKxcRgm/Va4QMEAnec0qXfdTVJaJiAW0bdKwKRRrrbwcTdNRGibdng==
|
||||
-----END RSA PRIVATE KEY-----
|
||||
"""
|
||||
|
||||
|
||||
CSR_STR = b"""
|
||||
-----BEGIN CERTIFICATE REQUEST-----
|
||||
MIIC1zCCAb8CAQAwczEUMBIGA1UEAwwLQUNvbW1vbk5hbWUxFTATBgNVBAoMDG9y
|
||||
Z2FuaXphdGlvbjEOMAwGA1UECwwFZ3VuaXQxCzAJBgNVBAYTAlVTMRMwEQYDVQQI
|
||||
DApDYWxpZm9ybmlhMRIwEAYDVQQHDAlzb21ld2hlcmUwggEiMA0GCSqGSIb3DQEB
|
||||
AQUAA4IBDwAwggEKAoIBAQDNnY+Ap+V9+Eg/PAtd7bq27D7tDvbL10AysNUSazy7
|
||||
gJyHfJyE3oiXm28zjFNzRQ35qhsCFpWg8M36FpdP9fIFG9sVXV/ye+YNBkZ2aTJi
|
||||
RnbErZcy8qc+2MRd2JKE9g0pISp9hAEeEPLTwSoGqf5VqOaBehBqL5OKNUr7JAxV
|
||||
TIH1oVU87w/6xg/WsUiyPo49WXxF/3DZNP1UOTYiffxIiARhTb9EtlXpt5iOlic3
|
||||
w/vBX6qsH++XJIus2WE+ABlAVUQTCvc6bgpu4zjc8nlm3ClqkAKcxn2ubEder+Fh
|
||||
hagMYGsbYG+/IWrKYN6S0BjE26tNMiOlmIebimjEdFpnAgMBAAGgHzAdBgkqhkiG
|
||||
9w0BCQ4xEDAOMAwGA1UdEwEB/wQCMAAwDQYJKoZIhvcNAQELBQADggEBAE5OKI/n
|
||||
b1ZRJDL4SpjWggRjfwBdYmb96lGH0aGDoVUP9UUusLzpWLtutkgr9Hh29agSsLZF
|
||||
j535NeXHf+Jc4UyR288WQVJthgAT1e5+jBNPxz4IcTnDW7ZMJLGm495XaKi6Krcg
|
||||
+8Qn2+h04jBTbN2Z9+MXGak0B8ycrbDx/FYL4KgBJRvS805d43zC6L1aUfRbpZgN
|
||||
QeQoBdLhFNB1kAYSWCyETwRQOeGEphBJYBPcXsQVBWbMtLpbhjRZ1uTVZEFIh8Oa
|
||||
zm3Cn4Ul8DO26w9QS4fmZjmnPOZFXYMWoOR6osHzb62PWQ8FBMqXcdToBV2Q9Iw4
|
||||
PiFAxlc0tVjlLqQ=
|
||||
-----END CERTIFICATE REQUEST-----
|
||||
"""
|
||||
|
||||
|
||||
CSR_PEM_STR = b"""
|
||||
-----BEGIN RSA PRIVATE KEY-----
|
||||
MIIEpgIBAAKCAQEAzZ2PgKflffhIPzwLXe26tuw+7Q72y9dAMrDVEms8u4Cch3yc
|
||||
hN6Il5tvM4xTc0UN+aobAhaVoPDN+haXT/XyBRvbFV1f8nvmDQZGdmkyYkZ2xK2X
|
||||
MvKnPtjEXdiShPYNKSEqfYQBHhDy08EqBqn+VajmgXoQai+TijVK+yQMVUyB9aFV
|
||||
PO8P+sYP1rFIsj6OPVl8Rf9w2TT9VDk2In38SIgEYU2/RLZV6beYjpYnN8P7wV+q
|
||||
rB/vlySLrNlhPgAZQFVEEwr3Om4KbuM43PJ5ZtwpapACnMZ9rmxHXq/hYYWoDGBr
|
||||
G2BvvyFqymDektAYxNurTTIjpZiHm4poxHRaZwIDAQABAoIBAQCm5MwVBrKtI/ko
|
||||
colbbVoPngSZkHrcC9SNEKFyON7r5sGm64t0AdjnDgAd3DnkJ1nnm54efMxo/OyD
|
||||
oRCik6QlZ23VkpwNi2m4iq5o8Iw33rAKhkhizzjXN0V0UxTinYEjMEt348ywZdtj
|
||||
67c7/4F0cArhb32hYwqjtQwuex0Tofb37Aj5rNPv1n8ytbhz1vriAVZHZEcjdjdn
|
||||
CSVeblufORQKRzK3wW5nRN721b9gSvEJHfHeNpXQO9X8Yl5tn7UxjoQWXLZK+khv
|
||||
pawN8BFt7lVLkQR14Nq7bKwuJ6KR1ig698a1Ii8Luyh2BgfIc25ryuzs8fFCioKi
|
||||
TK/nzMQ5AoGBAPxtbTrkTTNQ03hnfefRVKGwNHPqLhIQpI99FC4yLYYHsUwFmMVR
|
||||
ccg4aqNUtI0zn1snKC58NICxIPP9c0NuHqBuNuwhuRPINfQfjg/aOpE2QycZcew1
|
||||
BQxXH5d3zXWKLpN15kIS2s18/MpNgTFx2Z0EGqLezDXs6JaPJkqg04glAoGBANCG
|
||||
h106B9hbuTPYCAlTwvaoWnbaxLmtlWzRpqYBuiiBPGvLc545faXkJKb5/zd002kK
|
||||
wblGrPtCnhTvCtHbTg/KuR/R8EsAriPhpWK+N4hCADJ8SLOxMU5S9O24FEK50ltN
|
||||
Q84LS2Wo9fWXQhojiBrctn/ws5ngRCfUbhQeA3ybAoGBAOW7vWaUswIZ9GwnXDon
|
||||
lGuXDxXTslw0k2AXyM8GUdIinCSBD3m9Vt2PItZFWBEOQ2DVMUelOK9LBZ+pMkbT
|
||||
KMJ/rDKZunQbiacFNOiOhzDzfohOKxV7Z33EqPbUTMRFn4ALFCVcPZA4yWRgx0y1
|
||||
vgSd4JQMSzRkyYWFAKd42SuVAoGBAIG84aWQQGdNkin+Y+mhsrCSSE6giDtaE5jz
|
||||
y7KHapJe7f/HQnUUIee/zUoSSsbvKcW2CpfCsEdXyFEP9PRidOwAXjO9A7s2fiIW
|
||||
9zY7UQO2xLakevtJ6HppxLfOitSFFqr1pJUik9N5TyZw6JCowLqtzeJGGQhI7z60
|
||||
vZRIpDS3AoGBAPYJgNB7EcWj5U39ol+8cofG1kPauoEaildTur5ftzyzjy4DXrOW
|
||||
sfU/xhUp6EKLGSXEPqeXAWR6ARf1F4U9Ozp6KA93lGSrSY561jKoqhxxOXAf5il2
|
||||
p7Fzh2CckUeiGd5el+h2WUCOcgtlPlfRyV/Mlvx1H0gFieGucXTP23Ox
|
||||
-----END RSA PRIVATE KEY-----
|
||||
"""
|
||||
|
|
Loading…
Reference in New Issue