Initial work on #74. (#514)

* Initial work on #74.

* Fixing tests.

* Adding migration script.

* Excluding migrations from coverage report.
This commit is contained in:
kevgliss
2016-11-21 09:19:14 -08:00
committed by GitHub
parent d45e7d6b85
commit 744e204817
9 changed files with 142 additions and 57 deletions

View File

@ -79,6 +79,7 @@ class Certificate(db.Model):
secondaryjoin=id == certificate_replacement_associations.c.replaced_certificate_id, # noqa
backref='replaced')
views = relationship("View", backref="certificate")
endpoints = relationship("Endpoint", backref='certificate')
def __init__(self, **kwargs):

View File

@ -19,6 +19,7 @@ from lemur.destinations.models import Destination
from lemur.notifications.models import Notification
from lemur.authorities.models import Authority
from lemur.domains.models import Domain
from lemur.users.models import View
from lemur.roles.models import Role
from lemur.roles import service as role_service
@ -129,6 +130,19 @@ def update(cert_id, owner, description, notify, destinations, notifications, rep
return database.update(cert)
def log_private_key_view(certificate, user):
"""
Creates a record each time a certificates private key is viewed.
:param certificate:
:param user:
:return:
"""
view = View(user_id=user.id, certificate_id=certificate.id)
database.add(view)
database.commit()
def create_certificate_roles(**kwargs):
# create an role for the owner and assign it
owner_role = role_service.get_by_name(kwargs['owner'])

View File

@ -437,16 +437,18 @@ class CertificatePrivateKey(AuthenticatedResource):
if not cert:
return dict(message="Cannot find specified certificate"), 404
owner_role = role_service.get_by_name(cert.owner)
permission = CertificatePermission(cert.id, owner_role, [x.name for x in cert.roles])
if not g.current_user.is_admin:
owner_role = role_service.get_by_name(cert.owner)
permission = CertificatePermission(cert.id, owner_role, [x.name for x in cert.roles])
if permission.can():
response = make_response(jsonify(key=cert.private_key), 200)
response.headers['cache-control'] = 'private, max-age=0, no-cache, no-store'
response.headers['pragma'] = 'no-cache'
return response
if not permission.can():
return dict(message='You are not authorized to view this key'), 403
return dict(message='You are not authorized to view this key'), 403
service.log_private_key_view(cert, g.current_user)
response = make_response(jsonify(key=cert.private_key), 200)
response.headers['cache-control'] = 'private, max-age=0, no-cache, no-store'
response.headers['pragma'] = 'no-cache'
return response
class Certificates(AuthenticatedResource):
@ -908,45 +910,40 @@ class CertificateExport(AuthenticatedResource):
"""
cert = service.get(certificate_id)
owner_role = role_service.get_by_name(cert.owner)
permission = CertificatePermission(cert.id, owner_role, [x.name for x in cert.roles])
if not cert:
return dict(message="Cannot find specified certificate"), 404
options = data['plugin']['plugin_options']
plugin = data['plugin']['plugin_object']
if plugin.requires_key:
if cert.private_key:
if permission.can():
extension, passphrase, data = plugin.export(cert.body, cert.chain, cert.private_key, options)
else:
return dict(message='You are not authorized to export this certificate.'), 403
if not cert.private_key:
return dict(
message='Unable to export certificate, plugin: {0} requires a private key but no key was found.'.format(
plugin.slug))
else:
return dict(message='Unable to export certificate, plugin: {0} requires a private key but no key was found.'.format(plugin.slug))
else:
extension, passphrase, data = plugin.export(cert.body, cert.chain, cert.private_key, options)
if not g.current_user.is_admin:
owner_role = role_service.get_by_name(cert.owner)
permission = CertificatePermission(cert.id, owner_role, [x.name for x in cert.roles])
if not permission.can():
return dict(message='You are not authorized to export this certificate.'), 403
options = data['plugin']['plugin_options']
service.log_private_key_view(cert, g.current_user)
extension, passphrase, data = plugin.export(cert.body, cert.chain, cert.private_key, options)
# we take a hit in message size when b64 encoding
return dict(extension=extension, passphrase=passphrase, data=base64.b64encode(data).decode('utf-8'))
class CertificateClone(AuthenticatedResource):
def __init__(self):
self.reqparse = reqparse.RequestParser()
super(CertificateExport, self).__init__()
@validate_schema(None, certificate_output_schema)
def get(self, certificate_id):
pass
api.add_resource(CertificatesList, '/certificates', endpoint='certificates')
api.add_resource(Certificates, '/certificates/<int:certificate_id>', endpoint='certificate')
api.add_resource(CertificatesStats, '/certificates/stats', endpoint='certificateStats')
api.add_resource(CertificatesUpload, '/certificates/upload', endpoint='certificateUpload')
api.add_resource(CertificatePrivateKey, '/certificates/<int:certificate_id>/key', endpoint='privateKeyCertificates')
api.add_resource(CertificateExport, '/certificates/<int:certificate_id>/export', endpoint='exportCertificate')
api.add_resource(CertificateClone, '/certificates/<int:certificate_id>/clone', endpoint='cloneCertificate')
api.add_resource(NotificationCertificatesList, '/notifications/<int:notification_id>/certificates',
endpoint='notificationCertificates')
api.add_resource(CertificatesReplacementsList, '/certificates/<int:certificate_id>/replacements',