Marshmallowing roles (#313)

This commit is contained in:
kevgliss 2016-05-10 14:22:22 -07:00
parent 7f790be1e4
commit 9022059dc6
6 changed files with 138 additions and 327 deletions

View File

@ -26,4 +26,4 @@ class Role(db.Model):
description = Column(Text)
authority_id = Column(Integer, ForeignKey('authorities.id'))
user_id = Column(Integer, ForeignKey('users.id'))
users = relationship("User", secondary=roles_users, passive_deletes=True, backref="role", cascade='all,delete')
users = relationship("User", secondary=roles_users, viewonly=True, backref="role")

34
lemur/roles/schemas.py Normal file
View File

@ -0,0 +1,34 @@
"""
.. module: lemur.roles.schemas
:platform: unix
:copyright: (c) 2015 by Netflix Inc., see AUTHORS for more
:license: Apache, see LICENSE for more details.
.. moduleauthor:: Kevin Glisson <kglisson@netflix.com>
"""
from marshmallow import fields
from lemur.common.schema import LemurInputSchema, LemurOutputSchema
from lemur.schemas import AssociatedUserSchema, AssociatedAuthoritySchema
class RoleInputSchema(LemurInputSchema):
name = fields.String(required=True)
username = fields.String()
password = fields.String()
description = fields.String()
authorities = fields.Nested(AssociatedAuthoritySchema, many=True)
users = fields.Nested(AssociatedUserSchema, many=True)
class RoleOutputSchema(LemurOutputSchema):
name = fields.String()
username = fields.String()
password = fields.String()
description = fields.String()
authorities = fields.Nested(AssociatedAuthoritySchema, many=True)
users = fields.Nested(AssociatedUserSchema, many=True)
users = fields.Nested(AssociatedUserSchema, many=True)
role_input_schema = RoleInputSchema()
role_output_schema = RoleOutputSchema()
roles_output_schema = RoleOutputSchema(many=True)

View File

@ -92,10 +92,6 @@ def render(args):
:return:
"""
query = database.session_query(Role)
sort_by = args.pop('sort_by')
sort_dir = args.pop('sort_dir')
page = args.pop('page')
count = args.pop('count')
filt = args.pop('filter')
user_id = args.pop('user_id', None)
authority_id = args.pop('authority_id', None)
@ -117,9 +113,4 @@ def render(args):
terms = filt.split(';')
query = database.filter(query, Role, terms)
query = database.find_all(query, Role, args)
if sort_by and sort_dir:
query = database.sort(query, Role, sort_by, sort_dir)
return database.paginate(query, page, count)
database.sort_and_page(query, Role, args)

View File

@ -9,32 +9,28 @@
"""
from flask import Blueprint
from flask import make_response, jsonify, abort, g
from flask.ext.restful import reqparse, fields, Api
from flask.ext.restful import reqparse, Api
from lemur.roles import service
from lemur.auth.service import AuthenticatedResource
from lemur.auth.permissions import ViewRoleCredentialsPermission, admin_permission
from lemur.common.utils import marshal_items, paginated_parser
from lemur.common.utils import paginated_parser
from lemur.common.schema import validate_schema
from lemur.roles.schemas import role_input_schema, role_output_schema, roles_output_schema
mod = Blueprint('roles', __name__)
api = Api(mod)
FIELDS = {
'name': fields.String,
'description': fields.String,
'id': fields.Integer,
}
class RolesList(AuthenticatedResource):
""" Defines the 'roles' endpoint """
def __init__(self):
self.reqparse = reqparse.RequestParser()
super(RolesList, self).__init__()
@marshal_items(FIELDS)
@validate_schema(None, role_output_schema)
def get(self):
"""
.. http:get:: /roles
@ -90,8 +86,8 @@ class RolesList(AuthenticatedResource):
return service.render(args)
@admin_permission.require(http_exception=403)
@marshal_items(FIELDS)
def post(self):
@validate_schema(role_input_schema, role_output_schema)
def post(self, data=None):
"""
.. http:post:: /roles
@ -136,15 +132,8 @@ class RolesList(AuthenticatedResource):
:statuscode 200: no error
:statuscode 403: unauthenticated
"""
self.reqparse.add_argument('name', type=str, location='json', required=True)
self.reqparse.add_argument('description', type=str, location='json')
self.reqparse.add_argument('username', type=str, location='json')
self.reqparse.add_argument('password', type=str, location='json')
self.reqparse.add_argument('users', type=list, location='json')
args = self.reqparse.parse_args()
return service.create(args['name'], args.get('password'), args.get('description'), args.get('username'),
args.get('users'))
return service.create(data['name'], data.get('password'), data.get('description'), data.get('username'),
data.get('users'))
class RoleViewCredentials(AuthenticatedResource):
@ -197,7 +186,7 @@ class Roles(AuthenticatedResource):
self.reqparse = reqparse.RequestParser()
super(Roles, self).__init__()
@marshal_items(FIELDS)
@validate_schema(None, role_output_schema)
def get(self, role_id):
"""
.. http:get:: /roles/1
@ -238,8 +227,8 @@ class Roles(AuthenticatedResource):
return service.get(role_id)
@marshal_items(FIELDS)
def put(self, role_id):
@validate_schema(role_input_schema, role_output_schema)
def put(self, role_id, data=None):
"""
.. http:put:: /roles/1
@ -278,11 +267,7 @@ class Roles(AuthenticatedResource):
"""
permission = ViewRoleCredentialsPermission(role_id)
if permission.can():
self.reqparse.add_argument('name', type=str, location='json', required=True)
self.reqparse.add_argument('description', type=str, location='json')
self.reqparse.add_argument('users', type=list, location='json')
args = self.reqparse.parse_args()
return service.update(role_id, args['name'], args.get('description'), args.get('users'))
return service.update(role_id, data['name'], data.get('description'), data.get('users'))
abort(403)
@admin_permission.require(http_exception=403)
@ -326,7 +311,7 @@ class UserRolesList(AuthenticatedResource):
self.reqparse = reqparse.RequestParser()
super(UserRolesList, self).__init__()
@marshal_items(FIELDS)
@validate_schema(None, roles_output_schema)
def get(self, user_id):
"""
.. http:get:: /users/1/roles
@ -385,7 +370,7 @@ class AuthorityRolesList(AuthenticatedResource):
self.reqparse = reqparse.RequestParser()
super(AuthorityRolesList, self).__init__()
@marshal_items(FIELDS)
@validate_schema(None, roles_output_schema)
def get(self, authority_id):
"""
.. http:get:: /authorities/1/roles

View File

@ -14,6 +14,7 @@ from lemur.authorities.models import Authority
from lemur.destinations.models import Destination
from lemur.certificates.models import Certificate
from lemur.notifications.models import Notification
from lemur.users.models import User
from lemur.common import validators
from lemur.common.schema import LemurSchema, LemurInputSchema
@ -83,6 +84,18 @@ class AssociatedCertificateSchema(LemurInputSchema):
return Certificate.query.filter(Certificate.id == data['id']).one()
class AssociatedUserSchema(LemurInputSchema):
id = fields.Int(required=True)
@post_load
def get_object(self, data, many=False):
if many:
ids = [d['id'] for d in data]
return User.query.filter(User.id.in_(ids)).all()
else:
return User.query.filter(User.id == data['id']).one()
class PluginSchema(LemurInputSchema):
plugin_options = fields.Dict()
slug = fields.String()

View File

@ -1,311 +1,99 @@
from json import dumps
from lemur.roles.service import * # noqa
import pytest
from lemur.roles.views import * # noqa
def test_crud(session):
role = create('role1')
assert role.id > 0
from .vectors import VALID_ADMIN_HEADER_TOKEN, VALID_USER_HEADER_TOKEN
role = update(role.id, 'role_new', None, [])
assert role.name == 'role_new'
delete(role.id)
assert not get(role.id)
def test_role_input_schema(client):
from lemur.roles.schemas import RoleInputSchema
def test_role_get(client):
assert client.get(api.url_for(Roles, role_id=1)).status_code == 401
input_data = {
'name': 'myRole'
}
data, errors = RoleInputSchema().load(input_data)
def test_role_post(client):
assert client.post(api.url_for(Roles, role_id=1), data={}).status_code == 405
assert not errors
def test_role_put(client):
assert client.put(api.url_for(Roles, role_id=1), data={}).status_code == 401
@pytest.mark.parametrize("token,status", [
(VALID_USER_HEADER_TOKEN, 200),
(VALID_ADMIN_HEADER_TOKEN, 200),
('', 401)
])
def test_role_get(client, token, status):
assert client.get(api.url_for(Roles, role_id=1), headers=token).status_code == status
def test_role_delete(client):
assert client.delete(api.url_for(Roles, role_id=1)).status_code == 401
@pytest.mark.parametrize("token,status", [
(VALID_USER_HEADER_TOKEN, 405),
(VALID_ADMIN_HEADER_TOKEN, 405),
('', 405)
])
def test_role_post_(client, token, status):
assert client.post(api.url_for(Roles, role_id=1), data={}, headers=token).status_code == status
def test_role_patch(client):
assert client.patch(api.url_for(Roles, role_id=1), data={}).status_code == 405
@pytest.mark.parametrize("token,status", [
(VALID_USER_HEADER_TOKEN, 400),
(VALID_ADMIN_HEADER_TOKEN, 400),
('', 401)
])
def test_role_put(client, token, status):
assert client.put(api.url_for(Roles, role_id=1), data={}, headers=token).status_code == status
def test_roles_get(client):
assert client.get(api.url_for(RolesList)).status_code == 401
@pytest.mark.parametrize("token,status", [
(VALID_USER_HEADER_TOKEN, 403),
(VALID_ADMIN_HEADER_TOKEN, 200),
('', 401)
])
def test_role_delete(client, token, status):
assert client.delete(api.url_for(Roles, role_id=1), headers=token).status_code == status
def test_roles_post(client):
assert client.post(api.url_for(RolesList), data={}).status_code == 401
@pytest.mark.parametrize("token,status", [
(VALID_USER_HEADER_TOKEN, 405),
(VALID_ADMIN_HEADER_TOKEN, 405),
('', 405)
])
def test_role_patch(client, token, status):
assert client.patch(api.url_for(Roles, role_id=1), data={}, headers=token).status_code == status
def test_roles_put(client):
assert client.put(api.url_for(RolesList), data={}).status_code == 405
@pytest.mark.parametrize("token,status", [
(VALID_USER_HEADER_TOKEN, 403),
(VALID_ADMIN_HEADER_TOKEN, 400),
('', 401)
])
def test_role_list_post_(client, token, status):
assert client.post(api.url_for(RolesList), data={}, headers=token).status_code == status
def test_roles_delete(client):
assert client.delete(api.url_for(RolesList)).status_code == 405
@pytest.mark.parametrize("token,status", [
(VALID_USER_HEADER_TOKEN, 404),
(VALID_ADMIN_HEADER_TOKEN, 404),
('', 401)
])
def test_role_list_get(client, token, status):
assert client.get(api.url_for(RolesList), headers=token).status_code == status
def test_roles_patch(client):
assert client.patch(api.url_for(RolesList), data={}).status_code == 405
@pytest.mark.parametrize("token,status", [
(VALID_USER_HEADER_TOKEN, 405),
(VALID_ADMIN_HEADER_TOKEN, 405),
('', 405)
])
def test_role_list_delete(client, token, status):
assert client.delete(api.url_for(RolesList), headers=token).status_code == status
def test_role_credentials_get(client):
assert client.get(api.url_for(RoleViewCredentials, role_id=1)).status_code == 401
def test_role_credentials_post(client):
assert client.post(api.url_for(RoleViewCredentials, role_id=1), data={}).status_code == 405
def test_role_credentials_put(client):
assert client.put(api.url_for(RoleViewCredentials, role_id=1), data={}).status_code == 405
def test_role_credentials_delete(client):
assert client.delete(api.url_for(RoleViewCredentials, role_id=1)).status_code == 405
def test_role_credentials_patch(client):
assert client.patch(api.url_for(RoleViewCredentials, role_id=1), data={}).status_code == 405
def test_user_roles_get(client):
assert client.get(api.url_for(UserRolesList, user_id=1)).status_code == 401
def test_user_roles_post(client):
assert client.post(api.url_for(UserRolesList, user_id=1), data={}).status_code == 405
def test_user_roles_put(client):
assert client.put(api.url_for(UserRolesList, user_id=1), data={}).status_code == 405
def test_user_roles_delete(client):
assert client.delete(api.url_for(UserRolesList, user_id=1)).status_code == 405
def test_user_roles_patch(client):
assert client.patch(api.url_for(UserRolesList, user_id=1), data={}).status_code == 405
def test_authority_roles_get(client):
assert client.get(api.url_for(AuthorityRolesList, authority_id=1)).status_code == 401
def test_authority_roles_post(client):
assert client.post(api.url_for(AuthorityRolesList, authority_id=1), data={}).status_code == 405
def test_authority_roles_put(client):
assert client.put(api.url_for(AuthorityRolesList, authority_id=1), data={}).status_code == 405
def test_authority_roles_delete(client):
assert client.delete(api.url_for(AuthorityRolesList, authority_id=1)).status_code == 405
def test_authority_roles_patch(client):
assert client.patch(api.url_for(AuthorityRolesList, authority_id=1), data={}).status_code == 405
VALID_USER_HEADER_TOKEN = {
'Authorization': 'Basic ' + 'eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJpYXQiOjE0MzUyMzMzNjksInN1YiI6MSwiZXhwIjoxNTIxNTQ2OTY5fQ.1qCi0Ip7mzKbjNh0tVd3_eJOrae3rNa_9MCVdA4WtQI'}
def test_auth_role_get(client):
assert client.get(api.url_for(Roles, role_id=1), headers=VALID_USER_HEADER_TOKEN).status_code == 400
def test_auth_role_post_(client):
assert client.post(api.url_for(Roles, role_id=1), data={}, headers=VALID_USER_HEADER_TOKEN).status_code == 405
def test_auth_role_put(client):
assert client.put(api.url_for(Roles, role_id=1), data={}, headers=VALID_USER_HEADER_TOKEN).status_code == 400
def test_auth_role_delete(client):
assert client.delete(api.url_for(Roles, role_id=1), headers=VALID_USER_HEADER_TOKEN).status_code == 403
def test_auth_role_patch(client):
assert client.patch(api.url_for(Roles, role_id=1), data={}, headers=VALID_USER_HEADER_TOKEN).status_code == 405
def test_auth_roles_get(client):
assert client.get(api.url_for(RolesList), headers=VALID_USER_HEADER_TOKEN).status_code == 200
def test_auth_roles_post(client):
assert client.post(api.url_for(RolesList), data={}, headers=VALID_USER_HEADER_TOKEN).status_code == 403
def test_auth_role_credentials_get(client):
assert client.get(api.url_for(RoleViewCredentials, role_id=1), headers=VALID_USER_HEADER_TOKEN).status_code == 403
def test_auth_role_credentials_post(client):
assert client.post(api.url_for(RoleViewCredentials, role_id=1), data={}, headers=VALID_USER_HEADER_TOKEN).status_code == 405
def test_auth_role_credentials_put(client):
assert client.put(api.url_for(RoleViewCredentials, role_id=1), data={}, headers=VALID_USER_HEADER_TOKEN).status_code == 405
def test_auth_role_credentials_delete(client):
assert client.delete(api.url_for(RoleViewCredentials, role_id=1), headers=VALID_USER_HEADER_TOKEN).status_code == 405
def test_auth_role_credentials_patch(client):
assert client.patch(api.url_for(RoleViewCredentials, role_id=1), data={}, headers=VALID_USER_HEADER_TOKEN).status_code == 405
def test_auth_user_roles_get(client):
assert client.get(api.url_for(UserRolesList, user_id=1), headers=VALID_USER_HEADER_TOKEN).status_code == 200
def test_auth_user_roles_post(client):
assert client.post(api.url_for(UserRolesList, user_id=1), data={}, headers=VALID_USER_HEADER_TOKEN).status_code == 405
def test_auth_user_roles_put(client):
assert client.put(api.url_for(UserRolesList, user_id=1), data={}, headers=VALID_USER_HEADER_TOKEN).status_code == 405
def test_auth_user_roles_delete(client):
assert client.delete(api.url_for(UserRolesList, user_id=1), headers=VALID_USER_HEADER_TOKEN).status_code == 405
def test_auth_user_roles_patch(client):
assert client.patch(api.url_for(UserRolesList, user_id=1), data={}, headers=VALID_USER_HEADER_TOKEN).status_code == 405
def test_auth_authority_roles_get(client):
assert client.get(api.url_for(AuthorityRolesList, authority_id=1), headers=VALID_USER_HEADER_TOKEN).status_code == 200
def test_auth_authority_roles_post(client):
assert client.post(api.url_for(AuthorityRolesList, authority_id=1), data={}, headers=VALID_USER_HEADER_TOKEN).status_code == 405
def test_auth_authority_roles_put(client):
assert client.put(api.url_for(AuthorityRolesList, authority_id=1), data={}, headers=VALID_USER_HEADER_TOKEN).status_code == 405
def test_auth_authority_roles_delete(client):
assert client.delete(api.url_for(AuthorityRolesList, authority_id=1), headers=VALID_USER_HEADER_TOKEN).status_code == 405
def test_auth_authority_roles_patch(client):
assert client.patch(api.url_for(AuthorityRolesList, authority_id=1), data={}, headers=VALID_USER_HEADER_TOKEN).status_code == 405
VALID_ADMIN_HEADER_TOKEN = {
'Authorization': 'Basic ' + 'eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJpYXQiOjE0MzUyNTAyMTgsInN1YiI6MiwiZXhwIjoxNTIxNTYzODE4fQ.6mbq4-Ro6K5MmuNiTJBB153RDhlM5LGJBjI7GBKkfqA'}
def test_admin_role_get(client):
assert client.get(api.url_for(Roles, role_id=1), headers=VALID_ADMIN_HEADER_TOKEN).status_code == 200
def test_admin_role_post(client):
assert client.post(api.url_for(Roles, role_id=1), data={}, headers=VALID_ADMIN_HEADER_TOKEN).status_code == 405
def test_admin_role_put(client):
assert client.put(api.url_for(Roles, role_id=1), data={}, headers=VALID_ADMIN_HEADER_TOKEN).status_code == 400
def test_admin_role_delete(client):
assert client.delete(api.url_for(Roles, role_id=1), headers=VALID_ADMIN_HEADER_TOKEN).status_code == 200
def test_admin_role_patch(client):
assert client.patch(api.url_for(Roles, role_id=1), data={}, headers=VALID_ADMIN_HEADER_TOKEN).status_code == 405
def test_admin_roles_get(client):
resp = client.get(api.url_for(RolesList), headers=VALID_ADMIN_HEADER_TOKEN)
assert resp.status_code == 200
assert resp.json['total'] > 0
def test_admin_role_credentials_get(client):
assert client.get(api.url_for(RolesList), headers=VALID_ADMIN_HEADER_TOKEN).status_code == 200
def test_admin_role_credentials_post(client):
assert client.post(api.url_for(RolesList), data={}, headers=VALID_ADMIN_HEADER_TOKEN).status_code == 400
def test_admin_role_credentials_put(client):
assert client.put(api.url_for(RolesList), data={}, headers=VALID_ADMIN_HEADER_TOKEN).status_code == 405
def test_admin_role_credentials_delete(client):
assert client.delete(api.url_for(RolesList), headers=VALID_ADMIN_HEADER_TOKEN).status_code == 405
def test_admin_role_credentials_patch(client):
assert client.patch(api.url_for(RolesList), data={}, headers=VALID_ADMIN_HEADER_TOKEN).status_code == 405
def test_admin_user_roles_get(client):
assert client.get(api.url_for(UserRolesList, user_id=1), headers=VALID_ADMIN_HEADER_TOKEN).status_code == 200
def test_admin_user_roles_post(client):
assert client.post(api.url_for(UserRolesList, user_id=1), data={}, headers=VALID_ADMIN_HEADER_TOKEN).status_code == 405
def test_admin_user_roles_put(client):
assert client.put(api.url_for(UserRolesList, user_id=1), data={}, headers=VALID_ADMIN_HEADER_TOKEN).status_code == 405
def test_admin_user_roles_delete(client):
assert client.delete(api.url_for(UserRolesList, user_id=1), headers=VALID_ADMIN_HEADER_TOKEN).status_code == 405
def test_admin_user_roles_patch(client):
assert client.patch(api.url_for(UserRolesList, user_id=1), data={}, headers=VALID_ADMIN_HEADER_TOKEN).status_code == 405
def test_admin_authority_roles_get(client):
assert client.get(api.url_for(AuthorityRolesList, authority_id=1), headers=VALID_ADMIN_HEADER_TOKEN).status_code == 200
def test_admin_authority_roles_post(client):
assert client.post(api.url_for(AuthorityRolesList, authority_id=1), data={}, headers=VALID_ADMIN_HEADER_TOKEN).status_code == 405
def test_admin_authority_roles_put(client):
assert client.put(api.url_for(AuthorityRolesList, authority_id=1), data={}, headers=VALID_ADMIN_HEADER_TOKEN).status_code == 405
def test_admin_authority_roles_delete(client):
assert client.delete(api.url_for(AuthorityRolesList, authority_id=1), headers=VALID_ADMIN_HEADER_TOKEN).status_code == 405
def test_admin_authority_roles_patch(client):
assert client.patch(api.url_for(AuthorityRolesList, authority_id=1), data={}, headers=VALID_ADMIN_HEADER_TOKEN).status_code == 405
def test_admin_roles_crud(client):
assert client.post(api.url_for(RolesList), data={}, headers=VALID_ADMIN_HEADER_TOKEN).status_code == 400
data = {'name': 'role', 'description': 'test'}
resp = client.post(api.url_for(RolesList), data=dumps(data), content_type='application/json', headers=VALID_ADMIN_HEADER_TOKEN)
assert resp.status_code == 200
role_id = resp.json['id']
assert client.get(api.url_for(Roles, role_id=role_id), headers=VALID_ADMIN_HEADER_TOKEN).status_code == 200
resp = client.get(api.url_for(RolesList), headers=VALID_ADMIN_HEADER_TOKEN)
assert resp.status_code == 200
assert resp.json['total'] == 2
assert client.delete(api.url_for(Roles, role_id=role_id), headers=VALID_ADMIN_HEADER_TOKEN).status_code == 200
resp = client.get(api.url_for(RolesList), headers=VALID_ADMIN_HEADER_TOKEN)
assert resp.status_code == 200
assert resp.json['total'] == 1
@pytest.mark.parametrize("token,status", [
(VALID_USER_HEADER_TOKEN, 405),
(VALID_ADMIN_HEADER_TOKEN, 405),
('', 405)
])
def test_role_list_patch(client, token, status):
assert client.patch(api.url_for(RolesList), data={}, headers=token).status_code == status