diff --git a/lemur/certificates/schemas.py b/lemur/certificates/schemas.py index 5da342e5..688d6ba4 100644 --- a/lemur/certificates/schemas.py +++ b/lemur/certificates/schemas.py @@ -23,6 +23,7 @@ from lemur.domains.schemas import DomainNestedOutputSchema from lemur.notifications import service as notification_service from lemur.notifications.schemas import NotificationNestedOutputSchema from lemur.policies.schemas import RotationPolicyNestedOutputSchema +from lemur.roles import service as roles_service from lemur.roles.schemas import RoleNestedOutputSchema from lemur.schemas import ( AssociatedAuthoritySchema, @@ -184,13 +185,33 @@ class CertificateEditInputSchema(CertificateSchema): data["replaces"] = data[ "replacements" ] # TODO remove when field is deprecated + + if data.get("owner"): + # Check if role already exists. This avoids adding duplicate role. + if data.get("roles") and any(r.get("name") == data["owner"] for r in data["roles"]): + return data + + # Add required role + owner_role = roles_service.get_or_create( + data["owner"], + description=f"Auto generated role based on owner: {data['owner']}" + ) + + # Put role info in correct format using RoleNestedOutputSchema + owner_role_dict = RoleNestedOutputSchema().dump(owner_role).data + if data.get("roles"): + data["roles"].append(owner_role_dict) + else: + data["roles"] = [owner_role_dict] + return data @post_load def enforce_notifications(self, data): """ - Ensures that when an owner changes, default notifications are added for the new owner. - Old owner notifications are retained unless explicitly removed. + Add default notification for current owner if none exist. + This ensures that the default notifications are added in the event of owner change. + Old owner notifications are retained unless explicitly removed later in the code path. :param data: :return: """ @@ -198,11 +219,18 @@ class CertificateEditInputSchema(CertificateSchema): notification_name = "DEFAULT_{0}".format( data["owner"].split("@")[0].upper() ) + + # Even if one default role exists, return + # This allows a User to remove unwanted default notification for current owner + if any(n.label.startswith(notification_name) for n in data["notifications"]): + return data + data[ "notifications" ] += notification_service.create_default_expiration_notifications( notification_name, [data["owner"]] ) + return data diff --git a/lemur/certificates/service.py b/lemur/certificates/service.py index b676cffb..6d1bd2ac 100644 --- a/lemur/certificates/service.py +++ b/lemur/certificates/service.py @@ -256,6 +256,12 @@ def update(cert_id, **kwargs): return database.update(cert) +def cleanup_owner_roles_notification(owner_name, kwargs): + kwargs["roles"] = [r for r in kwargs["roles"] if r.name != owner_name] + notification_prefix = f"DEFAULT_{owner_name.split('@')[0].upper()}" + kwargs["notifications"] = [n for n in kwargs["notifications"] if not n.label.startswith(notification_prefix)] + + def update_notify(cert, notify_flag): """ Toggle notification value which is a boolean @@ -268,16 +274,11 @@ def update_notify(cert, notify_flag): def create_certificate_roles(**kwargs): - # create an role for the owner and assign it - owner_role = role_service.get_by_name(kwargs["owner"]) - - if not owner_role: - owner_role = role_service.create( - kwargs["owner"], - description="Auto generated role based on owner: {0}".format( - kwargs["owner"] - ), - ) + # create a role for the owner and assign it + owner_role = role_service.get_or_create( + kwargs["owner"], + description=f"Auto generated role based on owner: {kwargs['owner']}" + ) # ensure that the authority's owner is also associated with the certificate if kwargs.get("authority"): diff --git a/lemur/certificates/views.py b/lemur/certificates/views.py index 0eaba4e5..18746636 100644 --- a/lemur/certificates/views.py +++ b/lemur/certificates/views.py @@ -884,6 +884,10 @@ class Certificates(AuthenticatedResource): 400, ) + # if owner is changed, remove all notifications and roles associated with old owner + if cert.owner != data["owner"]: + service.cleanup_owner_roles_notification(cert.owner, data) + cert = service.update(certificate_id, **data) log_service.create(g.current_user, "update_cert", certificate=cert) return cert diff --git a/lemur/roles/service.py b/lemur/roles/service.py index 51597d6e..fa4c9c97 100644 --- a/lemur/roles/service.py +++ b/lemur/roles/service.py @@ -128,3 +128,11 @@ def render(args): query = database.filter(query, Role, terms) return database.sort_and_page(query, Role, args) + + +def get_or_create(role_name, description): + role = get_by_name(role_name) + if not role: + role = create(name=role_name, description=description) + + return role diff --git a/lemur/tests/test_certificates.py b/lemur/tests/test_certificates.py index 8403461b..4edc485f 100644 --- a/lemur/tests/test_certificates.py +++ b/lemur/tests/test_certificates.py @@ -180,7 +180,10 @@ def test_certificate_edit_schema(session): input_data = {"owner": "bob@example.com"} data, errors = CertificateEditInputSchema().load(input_data) + + assert not errors assert len(data["notifications"]) == 3 + assert data["roles"][0].name == input_data["owner"] def test_authority_key_identifier_schema(): @@ -970,6 +973,9 @@ def test_certificate_put_with_data(client, certificate, issuer_plugin): headers=VALID_ADMIN_HEADER_TOKEN, ) assert resp.status_code == 200 + assert len(certificate.notifications) == 3 + assert certificate.roles[0].name == "bob@example.com" + assert certificate.notify @pytest.mark.parametrize(