Merge pull request #2388 from rmoesbergen/master
Implement certificates 'delete' API call
This commit is contained in:
commit
1a2712cdf1
|
@ -6,6 +6,7 @@
|
||||||
.. moduleauthor:: Kevin Glisson <kglisson@netflix.com>
|
.. moduleauthor:: Kevin Glisson <kglisson@netflix.com>
|
||||||
"""
|
"""
|
||||||
import base64
|
import base64
|
||||||
|
import arrow
|
||||||
from builtins import str
|
from builtins import str
|
||||||
|
|
||||||
from flask import Blueprint, make_response, jsonify, g
|
from flask import Blueprint, make_response, jsonify, g
|
||||||
|
@ -660,6 +661,51 @@ class Certificates(AuthenticatedResource):
|
||||||
log_service.create(g.current_user, 'update_cert', certificate=cert)
|
log_service.create(g.current_user, 'update_cert', certificate=cert)
|
||||||
return cert
|
return cert
|
||||||
|
|
||||||
|
def delete(self, certificate_id, data=None):
|
||||||
|
"""
|
||||||
|
.. http:delete:: /certificates/1
|
||||||
|
|
||||||
|
Delete a certificate
|
||||||
|
|
||||||
|
**Example request**:
|
||||||
|
|
||||||
|
.. sourcecode:: http
|
||||||
|
|
||||||
|
DELETE /certificates/1 HTTP/1.1
|
||||||
|
Host: example.com
|
||||||
|
|
||||||
|
**Example response**:
|
||||||
|
|
||||||
|
.. sourcecode:: http
|
||||||
|
|
||||||
|
HTTP/1.1 200 OK
|
||||||
|
|
||||||
|
:reqheader Authorization: OAuth token to authenticate
|
||||||
|
:statuscode 204: no error
|
||||||
|
:statuscode 403: unauthenticated
|
||||||
|
:statusoode 404: certificate not found
|
||||||
|
|
||||||
|
"""
|
||||||
|
cert = service.get(certificate_id)
|
||||||
|
|
||||||
|
if not cert:
|
||||||
|
return dict(message="Cannot find specified certificate"), 404
|
||||||
|
|
||||||
|
# allow creators
|
||||||
|
if g.current_user != cert.user:
|
||||||
|
owner_role = role_service.get_by_name(cert.owner)
|
||||||
|
permission = CertificatePermission(owner_role, [x.name for x in cert.roles])
|
||||||
|
|
||||||
|
if not permission.can():
|
||||||
|
return dict(message='You are not authorized to delete this certificate'), 403
|
||||||
|
|
||||||
|
if arrow.get(cert.not_after) > arrow.utcnow():
|
||||||
|
return dict(message='Certificate is still valid, only expired certificates can be deleted'), 412
|
||||||
|
|
||||||
|
service.update(certificate_id, deleted=True)
|
||||||
|
log_service.create(g.current_user, 'delete_cert', certificate=cert)
|
||||||
|
return '', 204
|
||||||
|
|
||||||
|
|
||||||
class NotificationCertificatesList(AuthenticatedResource):
|
class NotificationCertificatesList(AuthenticatedResource):
|
||||||
""" Defines the 'certificates' endpoint """
|
""" Defines the 'certificates' endpoint """
|
||||||
|
|
|
@ -18,6 +18,6 @@ class Log(db.Model):
|
||||||
__tablename__ = 'logs'
|
__tablename__ = 'logs'
|
||||||
id = Column(Integer, primary_key=True)
|
id = Column(Integer, primary_key=True)
|
||||||
certificate_id = Column(Integer, ForeignKey('certificates.id'))
|
certificate_id = Column(Integer, ForeignKey('certificates.id'))
|
||||||
log_type = Column(Enum('key_view', 'create_cert', 'update_cert', 'revoke_cert', name='log_type'), nullable=False)
|
log_type = Column(Enum('key_view', 'create_cert', 'update_cert', 'revoke_cert', 'delete_cert', name='log_type'), nullable=False)
|
||||||
logged_at = Column(ArrowType(), PassiveDefault(func.now()), nullable=False)
|
logged_at = Column(ArrowType(), PassiveDefault(func.now()), nullable=False)
|
||||||
user_id = Column(Integer, ForeignKey('users.id'), nullable=False)
|
user_id = Column(Integer, ForeignKey('users.id'), nullable=False)
|
||||||
|
|
|
@ -0,0 +1,22 @@
|
||||||
|
""" Add delete_cert to log_type enum
|
||||||
|
|
||||||
|
Revision ID: 9f79024fe67b
|
||||||
|
Revises: ee827d1e1974
|
||||||
|
Create Date: 2019-01-03 15:36:59.181911
|
||||||
|
|
||||||
|
"""
|
||||||
|
|
||||||
|
# revision identifiers, used by Alembic.
|
||||||
|
revision = '9f79024fe67b'
|
||||||
|
down_revision = 'ee827d1e1974'
|
||||||
|
|
||||||
|
from alembic import op
|
||||||
|
import sqlalchemy as sa
|
||||||
|
|
||||||
|
|
||||||
|
def upgrade():
|
||||||
|
op.sync_enum_values('public', 'log_type', ['create_cert', 'key_view', 'revoke_cert', 'update_cert'], ['create_cert', 'delete_cert', 'key_view', 'revoke_cert', 'update_cert'])
|
||||||
|
|
||||||
|
|
||||||
|
def downgrade():
|
||||||
|
op.sync_enum_values('public', 'log_type', ['create_cert', 'delete_cert', 'key_view', 'revoke_cert', 'update_cert'], ['create_cert', 'key_view', 'revoke_cert', 'update_cert'])
|
|
@ -14,7 +14,8 @@ from lemur.tests.vectors import SAN_CERT_KEY, INTERMEDIATE_KEY
|
||||||
|
|
||||||
from .factories import ApiKeyFactory, AuthorityFactory, NotificationFactory, DestinationFactory, \
|
from .factories import ApiKeyFactory, AuthorityFactory, NotificationFactory, DestinationFactory, \
|
||||||
CertificateFactory, UserFactory, RoleFactory, SourceFactory, EndpointFactory, \
|
CertificateFactory, UserFactory, RoleFactory, SourceFactory, EndpointFactory, \
|
||||||
RotationPolicyFactory, PendingCertificateFactory, AsyncAuthorityFactory, CryptoAuthorityFactory
|
RotationPolicyFactory, PendingCertificateFactory, AsyncAuthorityFactory, InvalidCertificateFactory, \
|
||||||
|
CryptoAuthorityFactory
|
||||||
|
|
||||||
|
|
||||||
def pytest_runtest_setup(item):
|
def pytest_runtest_setup(item):
|
||||||
|
@ -167,6 +168,15 @@ def pending_certificate(session):
|
||||||
return p
|
return p
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.fixture
|
||||||
|
def invalid_certificate(session):
|
||||||
|
u = UserFactory()
|
||||||
|
a = AsyncAuthorityFactory()
|
||||||
|
i = InvalidCertificateFactory(user=u, authority=a)
|
||||||
|
session.commit()
|
||||||
|
return i
|
||||||
|
|
||||||
|
|
||||||
@pytest.fixture
|
@pytest.fixture
|
||||||
def admin_user(session):
|
def admin_user(session):
|
||||||
u = UserFactory()
|
u = UserFactory()
|
||||||
|
|
|
@ -20,7 +20,7 @@ from lemur.policies.models import RotationPolicy
|
||||||
from lemur.api_keys.models import ApiKey
|
from lemur.api_keys.models import ApiKey
|
||||||
|
|
||||||
from .vectors import SAN_CERT_STR, SAN_CERT_KEY, CSR_STR, INTERMEDIATE_CERT_STR, ROOTCA_CERT_STR, INTERMEDIATE_KEY, \
|
from .vectors import SAN_CERT_STR, SAN_CERT_KEY, CSR_STR, INTERMEDIATE_CERT_STR, ROOTCA_CERT_STR, INTERMEDIATE_KEY, \
|
||||||
WILDCARD_CERT_KEY
|
WILDCARD_CERT_KEY, INVALID_CERT_STR
|
||||||
|
|
||||||
|
|
||||||
class BaseFactory(SQLAlchemyModelFactory):
|
class BaseFactory(SQLAlchemyModelFactory):
|
||||||
|
@ -137,6 +137,11 @@ class CACertificateFactory(CertificateFactory):
|
||||||
private_key = INTERMEDIATE_KEY
|
private_key = INTERMEDIATE_KEY
|
||||||
|
|
||||||
|
|
||||||
|
class InvalidCertificateFactory(CertificateFactory):
|
||||||
|
body = INVALID_CERT_STR
|
||||||
|
private_key = ''
|
||||||
|
|
||||||
|
|
||||||
class AuthorityFactory(BaseFactory):
|
class AuthorityFactory(BaseFactory):
|
||||||
"""Authority factory."""
|
"""Authority factory."""
|
||||||
name = Sequence(lambda n: 'authority{0}'.format(n))
|
name = Sequence(lambda n: 'authority{0}'.format(n))
|
||||||
|
|
|
@ -653,15 +653,26 @@ def test_certificate_put_with_data(client, certificate, issuer_plugin):
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.parametrize("token,status", [
|
@pytest.mark.parametrize("token,status", [
|
||||||
(VALID_USER_HEADER_TOKEN, 405),
|
(VALID_USER_HEADER_TOKEN, 403),
|
||||||
(VALID_ADMIN_HEADER_TOKEN, 405),
|
(VALID_ADMIN_HEADER_TOKEN, 412),
|
||||||
(VALID_ADMIN_API_TOKEN, 405),
|
(VALID_ADMIN_API_TOKEN, 412),
|
||||||
('', 405)
|
('', 401)
|
||||||
])
|
])
|
||||||
def test_certificate_delete(client, token, status):
|
def test_certificate_delete(client, token, status):
|
||||||
assert client.delete(api.url_for(Certificates, certificate_id=1), headers=token).status_code == status
|
assert client.delete(api.url_for(Certificates, certificate_id=1), headers=token).status_code == status
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.parametrize("token,status", [
|
||||||
|
(VALID_USER_HEADER_TOKEN, 403),
|
||||||
|
(VALID_ADMIN_HEADER_TOKEN, 204),
|
||||||
|
(VALID_ADMIN_API_TOKEN, 204),
|
||||||
|
('', 401)
|
||||||
|
])
|
||||||
|
def test_invalid_certificate_delete(client, invalid_certificate, token, status):
|
||||||
|
assert client.delete(
|
||||||
|
api.url_for(Certificates, certificate_id=invalid_certificate.id), headers=token).status_code == status
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.parametrize("token,status", [
|
@pytest.mark.parametrize("token,status", [
|
||||||
(VALID_USER_HEADER_TOKEN, 405),
|
(VALID_USER_HEADER_TOKEN, 405),
|
||||||
(VALID_ADMIN_HEADER_TOKEN, 405),
|
(VALID_ADMIN_HEADER_TOKEN, 405),
|
||||||
|
|
Loading…
Reference in New Issue