Implement certificates delete API call by marking a cert as 'deleted' in the database. Only certificates that have expired can be deleted.

This commit is contained in:
Ronald Moesbergen 2019-01-21 10:25:28 +01:00
parent cb35f19d6c
commit 4c4fbf3e48
4 changed files with 78 additions and 6 deletions

View File

@ -6,6 +6,7 @@
.. moduleauthor:: Kevin Glisson <kglisson@netflix.com> .. moduleauthor:: Kevin Glisson <kglisson@netflix.com>
""" """
import base64 import base64
import arrow
from builtins import str from builtins import str
from flask import Blueprint, make_response, jsonify, g from flask import Blueprint, make_response, jsonify, g
@ -660,6 +661,51 @@ class Certificates(AuthenticatedResource):
log_service.create(g.current_user, 'update_cert', certificate=cert) log_service.create(g.current_user, 'update_cert', certificate=cert)
return cert return cert
def delete(self, certificate_id, data=None):
"""
.. http:delete:: /certificates/1
Delete a certificate
**Example request**:
.. sourcecode:: http
DELETE /certificates/1 HTTP/1.1
Host: example.com
**Example response**:
.. sourcecode:: http
HTTP/1.1 200 OK
:reqheader Authorization: OAuth token to authenticate
:statuscode 204: no error
:statuscode 403: unauthenticated
:statusoode 404: certificate not found
"""
cert = service.get(certificate_id)
if not cert:
return dict(message="Cannot find specified certificate"), 404
# allow creators
if g.current_user != cert.user:
owner_role = role_service.get_by_name(cert.owner)
permission = CertificatePermission(owner_role, [x.name for x in cert.roles])
if not permission.can():
return dict(message='You are not authorized to delete this certificate'), 403
if arrow.get(cert.not_after) > arrow.utcnow():
return dict(message='Certificate is still valid, only expired certificates can be deleted'), 412
service.update(certificate_id, deleted=True)
log_service.create(g.current_user, 'delete_cert', certificate=cert)
return '', 204
class NotificationCertificatesList(AuthenticatedResource): class NotificationCertificatesList(AuthenticatedResource):
""" Defines the 'certificates' endpoint """ """ Defines the 'certificates' endpoint """

View File

@ -15,7 +15,8 @@ from lemur.tests.vectors import SAN_CERT_KEY, INTERMEDIATE_KEY
from .factories import ApiKeyFactory, AuthorityFactory, NotificationFactory, DestinationFactory, \ from .factories import ApiKeyFactory, AuthorityFactory, NotificationFactory, DestinationFactory, \
CertificateFactory, UserFactory, RoleFactory, SourceFactory, EndpointFactory, \ CertificateFactory, UserFactory, RoleFactory, SourceFactory, EndpointFactory, \
RotationPolicyFactory, PendingCertificateFactory, AsyncAuthorityFactory, CryptoAuthorityFactory RotationPolicyFactory, PendingCertificateFactory, AsyncAuthorityFactory, InvalidCertificateFactory, \
CryptoAuthorityFactory
def pytest_runtest_setup(item): def pytest_runtest_setup(item):
@ -168,6 +169,15 @@ def pending_certificate(session):
return p return p
@pytest.fixture
def invalid_certificate(session):
u = UserFactory()
a = AsyncAuthorityFactory()
i = InvalidCertificateFactory(user=u, authority=a)
session.commit()
return i
@pytest.fixture @pytest.fixture
def admin_user(session): def admin_user(session):
u = UserFactory() u = UserFactory()

View File

@ -20,7 +20,7 @@ from lemur.policies.models import RotationPolicy
from lemur.api_keys.models import ApiKey from lemur.api_keys.models import ApiKey
from .vectors import SAN_CERT_STR, SAN_CERT_KEY, CSR_STR, INTERMEDIATE_CERT_STR, ROOTCA_CERT_STR, INTERMEDIATE_KEY, \ from .vectors import SAN_CERT_STR, SAN_CERT_KEY, CSR_STR, INTERMEDIATE_CERT_STR, ROOTCA_CERT_STR, INTERMEDIATE_KEY, \
WILDCARD_CERT_KEY WILDCARD_CERT_KEY, INVALID_CERT_STR
class BaseFactory(SQLAlchemyModelFactory): class BaseFactory(SQLAlchemyModelFactory):
@ -137,6 +137,11 @@ class CACertificateFactory(CertificateFactory):
private_key = INTERMEDIATE_KEY private_key = INTERMEDIATE_KEY
class InvalidCertificateFactory(CertificateFactory):
body = INVALID_CERT_STR
private_key = ''
class AuthorityFactory(BaseFactory): class AuthorityFactory(BaseFactory):
"""Authority factory.""" """Authority factory."""
name = Sequence(lambda n: 'authority{0}'.format(n)) name = Sequence(lambda n: 'authority{0}'.format(n))

View File

@ -647,15 +647,26 @@ def test_certificate_put_with_data(client, certificate, issuer_plugin):
@pytest.mark.parametrize("token,status", [ @pytest.mark.parametrize("token,status", [
(VALID_USER_HEADER_TOKEN, 405), (VALID_USER_HEADER_TOKEN, 403),
(VALID_ADMIN_HEADER_TOKEN, 405), (VALID_ADMIN_HEADER_TOKEN, 412),
(VALID_ADMIN_API_TOKEN, 405), (VALID_ADMIN_API_TOKEN, 412),
('', 405) ('', 401)
]) ])
def test_certificate_delete(client, token, status): def test_certificate_delete(client, token, status):
assert client.delete(api.url_for(Certificates, certificate_id=1), headers=token).status_code == status assert client.delete(api.url_for(Certificates, certificate_id=1), headers=token).status_code == status
@pytest.mark.parametrize("token,status", [
(VALID_USER_HEADER_TOKEN, 403),
(VALID_ADMIN_HEADER_TOKEN, 204),
(VALID_ADMIN_API_TOKEN, 204),
('', 401)
])
def test_invalid_certificate_delete(client, invalid_certificate, token, status):
assert client.delete(
api.url_for(Certificates, certificate_id=invalid_certificate.id), headers=token).status_code == status
@pytest.mark.parametrize("token,status", [ @pytest.mark.parametrize("token,status", [
(VALID_USER_HEADER_TOKEN, 405), (VALID_USER_HEADER_TOKEN, 405),
(VALID_ADMIN_HEADER_TOKEN, 405), (VALID_ADMIN_HEADER_TOKEN, 405),