Certificate edit: update role and notification with owner change
This commit is contained in:
parent
b59c76d630
commit
d52e0d4e09
@ -23,6 +23,7 @@ from lemur.domains.schemas import DomainNestedOutputSchema
|
||||
from lemur.notifications import service as notification_service
|
||||
from lemur.notifications.schemas import NotificationNestedOutputSchema
|
||||
from lemur.policies.schemas import RotationPolicyNestedOutputSchema
|
||||
from lemur.roles import service as roles_service
|
||||
from lemur.roles.schemas import RoleNestedOutputSchema
|
||||
from lemur.schemas import (
|
||||
AssociatedAuthoritySchema,
|
||||
@ -184,13 +185,33 @@ class CertificateEditInputSchema(CertificateSchema):
|
||||
data["replaces"] = data[
|
||||
"replacements"
|
||||
] # TODO remove when field is deprecated
|
||||
|
||||
if data.get("owner"):
|
||||
# Check if role already exists. This avoids adding duplicate role.
|
||||
if data.get("roles") and any(r.get("name") == data["owner"] for r in data["roles"]):
|
||||
return data
|
||||
|
||||
# Add required role
|
||||
owner_role = roles_service.get_or_create(
|
||||
data["owner"],
|
||||
description="Auto generated role based on owner: {0}".format(data["owner"])
|
||||
)
|
||||
|
||||
# Put role info in correct format using RoleNestedOutputSchema
|
||||
owner_role_dict = RoleNestedOutputSchema().dump(owner_role).data
|
||||
if data.get("roles"):
|
||||
data["roles"].append(owner_role_dict)
|
||||
else:
|
||||
data["roles"] = [owner_role_dict]
|
||||
|
||||
return data
|
||||
|
||||
@post_load
|
||||
def enforce_notifications(self, data):
|
||||
"""
|
||||
Ensures that when an owner changes, default notifications are added for the new owner.
|
||||
Old owner notifications are retained unless explicitly removed.
|
||||
Add default notification for current owner if none exist.
|
||||
This ensures that the default notifications are added in the even of owner change.
|
||||
Old owner notifications are retained unless explicitly removed later in the code path.
|
||||
:param data:
|
||||
:return:
|
||||
"""
|
||||
@ -198,11 +219,18 @@ class CertificateEditInputSchema(CertificateSchema):
|
||||
notification_name = "DEFAULT_{0}".format(
|
||||
data["owner"].split("@")[0].upper()
|
||||
)
|
||||
|
||||
# Even if one default role exists, return
|
||||
# This allows a User to remove unwanted default notification for current owner
|
||||
if any(n.label.startswith(notification_name) for n in data["notifications"]):
|
||||
return data
|
||||
|
||||
data[
|
||||
"notifications"
|
||||
] += notification_service.create_default_expiration_notifications(
|
||||
notification_name, [data["owner"]]
|
||||
)
|
||||
|
||||
return data
|
||||
|
||||
|
||||
|
@ -256,6 +256,14 @@ def update(cert_id, **kwargs):
|
||||
return database.update(cert)
|
||||
|
||||
|
||||
def cleanup_owner_roles_notification(owner_name, kwargs):
|
||||
kwargs["roles"] = [r for r in kwargs["roles"] if r.name != owner_name]
|
||||
notification_prefix = "DEFAULT_{0}".format(
|
||||
owner_name.split("@")[0].upper()
|
||||
)
|
||||
kwargs["notifications"] = [n for n in kwargs["notifications"] if not n.label.startswith(notification_prefix)]
|
||||
|
||||
|
||||
def update_notify(cert, notify_flag):
|
||||
"""
|
||||
Toggle notification value which is a boolean
|
||||
@ -268,16 +276,13 @@ def update_notify(cert, notify_flag):
|
||||
|
||||
|
||||
def create_certificate_roles(**kwargs):
|
||||
# create an role for the owner and assign it
|
||||
owner_role = role_service.get_by_name(kwargs["owner"])
|
||||
|
||||
if not owner_role:
|
||||
owner_role = role_service.create(
|
||||
kwargs["owner"],
|
||||
description="Auto generated role based on owner: {0}".format(
|
||||
kwargs["owner"]
|
||||
),
|
||||
# create a role for the owner and assign it
|
||||
owner_role = role_service.get_or_create(
|
||||
kwargs["owner"],
|
||||
description="Auto generated role based on owner: {0}".format(
|
||||
kwargs["owner"]
|
||||
)
|
||||
)
|
||||
|
||||
# ensure that the authority's owner is also associated with the certificate
|
||||
if kwargs.get("authority"):
|
||||
|
@ -884,6 +884,10 @@ class Certificates(AuthenticatedResource):
|
||||
400,
|
||||
)
|
||||
|
||||
# if owner is changed, remove all notifications and roles associated with old owner
|
||||
if cert.owner != data["owner"]:
|
||||
service.cleanup_owner_roles_notification(cert.owner, data)
|
||||
|
||||
cert = service.update(certificate_id, **data)
|
||||
log_service.create(g.current_user, "update_cert", certificate=cert)
|
||||
return cert
|
||||
|
@ -128,3 +128,11 @@ def render(args):
|
||||
query = database.filter(query, Role, terms)
|
||||
|
||||
return database.sort_and_page(query, Role, args)
|
||||
|
||||
|
||||
def get_or_create(role_name, description):
|
||||
role = get_by_name(role_name)
|
||||
if not role:
|
||||
role = create(name=role_name, description=description)
|
||||
|
||||
return role
|
||||
|
@ -180,7 +180,10 @@ def test_certificate_edit_schema(session):
|
||||
|
||||
input_data = {"owner": "bob@example.com"}
|
||||
data, errors = CertificateEditInputSchema().load(input_data)
|
||||
|
||||
assert not errors
|
||||
assert len(data["notifications"]) == 3
|
||||
assert data["roles"][0].name == input_data["owner"]
|
||||
|
||||
|
||||
def test_authority_key_identifier_schema():
|
||||
@ -970,6 +973,9 @@ def test_certificate_put_with_data(client, certificate, issuer_plugin):
|
||||
headers=VALID_ADMIN_HEADER_TOKEN,
|
||||
)
|
||||
assert resp.status_code == 200
|
||||
assert len(certificate.notifications) == 3
|
||||
assert certificate.roles[0].name == "bob@example.com"
|
||||
assert certificate.notify
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
|
Loading…
Reference in New Issue
Block a user