add per user api keys to the backend (#995)

Adds in per user api keys to the backend of lemur.
the basics are:
  - API Keys are really just JWTs with custom second length TTLs.
  - API Keys are provided in the exact same ways JWTs are now.
  - API Keys can be revoked/unrevoked at any time by their creator
    as well as have their TTL Change at anytime.
  - Users can create/view/list their own API Keys at will, and
    an admin role has permission to modify all api keys in the
    instance.

Adds in support for lemur api keys to the frontend of lemur.
doing this required a few changes to the backend as well, but it is
now all working (maybe not the best way though, review will determine
that).

  - fixes inconsistency in moduleauthor name I inputted during the
    first commit.
  - Allows the revoke schema to optionally allow a full api_key object.
  - Adds `/users/:user_id/api_keys/:api_key` and `/users/:user_id/api_keys`
    endpoints.
  - normalizes use of `userId` vs `userId`
  - makes `put` call respond with a JWT so the frontend can show
    the token on updating.
  - adds in the API Key views for clicking "API Keys" on the main nav.
  - adds in the API Key views for clicking into a users edit page.
  - adds tests for the API Key backend views I added.
This commit is contained in:
Eric 2017-12-04 09:50:31 -07:00 committed by kevgliss
parent eb810f1bf0
commit c402f1ff87
35 changed files with 1578 additions and 20 deletions

3
.gitignore vendored
View File

@ -29,3 +29,6 @@ docs/_build
.editorconfig .editorconfig
.idea .idea
lemur/tests/tmp lemur/tests/tmp
/lemur/plugins/lemur_email/tests/expiration-rendered.html
/lemur/plugins/lemur_email/tests/rotation-rendered.html

View File

@ -8,6 +8,8 @@ Changelog
Adds per-certificate rotation policies, requires a database migration. The default rotation policy for all certificates Adds per-certificate rotation policies, requires a database migration. The default rotation policy for all certificates
is 30 days. Every certificate will gain a policy regardless is auto-rotation is used. is 30 days. Every certificate will gain a policy regardless is auto-rotation is used.
Adds per-user API Keys, requires a database migration.
.. note:: This version is not yet released and is under active development .. note:: This version is not yet released and is under active development

View File

@ -26,6 +26,7 @@ from lemur.notifications.views import mod as notifications_bp
from lemur.sources.views import mod as sources_bp from lemur.sources.views import mod as sources_bp
from lemur.endpoints.views import mod as endpoints_bp from lemur.endpoints.views import mod as endpoints_bp
from lemur.logs.views import mod as logs_bp from lemur.logs.views import mod as logs_bp
from lemur.api_keys.views import mod as api_key_bp
from lemur.__about__ import ( from lemur.__about__ import (
__author__, __copyright__, __email__, __license__, __summary__, __title__, __author__, __copyright__, __email__, __license__, __summary__, __title__,
@ -51,7 +52,8 @@ LEMUR_BLUEPRINTS = (
notifications_bp, notifications_bp,
sources_bp, sources_bp,
endpoints_bp, endpoints_bp,
logs_bp logs_bp,
api_key_bp
) )

View File

41
lemur/api_keys/cli.py Normal file
View File

@ -0,0 +1,41 @@
"""
.. module: lemur.api_keys.cli
:platform: Unix
:copyright: (c) 2017 by Netflix Inc., see AUTHORS for more
:license: Apache, see LICENSE for more details.
.. moduleauthor:: Eric Coan <kungfury@instructure.com>
"""
from flask_script import Manager
from lemur.api_keys import service as api_key_service
from lemur.auth.service import create_token
from datetime import datetime
manager = Manager(usage="Handles all api key related tasks.")
@manager.option('-u', '--user-id', dest='uid', help='The User ID this access key belongs too.')
@manager.option('-n', '--name', dest='name', help='The name of this API Key.')
@manager.option('-t', '--ttl', dest='ttl', help='The TTL of this API Key. -1 for forever.')
def create(uid, name, ttl):
"""
Create a new api key for a user.
:return:
"""
print("[+] Creating a new api key.")
key = api_key_service.create(user_id=uid, name=name,
ttl=ttl, issued_at=int(datetime.utcnow().timestamp()), revoked=False)
print("[+] Successfully created a new api key. Generating a JWT...")
jwt = create_token(uid, key.id, key.ttl)
print("[+] Your JWT is: {jwt}".format(jwt=jwt))
@manager.option('-a', '--api-key-id', dest='aid', help='The API Key ID to revoke.')
def revoke(aid):
"""
Revokes an api key for a user.
:return:
"""
print("[-] Revoking the API Key api key.")
api_key_service.revoke(aid=aid)
print("[+] Successfully revoked the api key")

25
lemur/api_keys/models.py Normal file
View File

@ -0,0 +1,25 @@
"""
.. module: lemur.api_keys.models
:platform: Unix
:synopsis: This module contains all of the models need to create an api key within Lemur.
:copyright: (c) 2017 by Netflix Inc., see AUTHORS for more
:license: Apache, see LICENSE for more details.
.. moduleauthor:: Eric Coan <kungfury@instructure.com>
"""
from sqlalchemy import BigInteger, Boolean, Column, ForeignKey, Integer, String
from lemur.database import db
class ApiKey(db.Model):
__tablename__ = 'api_keys'
id = Column(Integer, primary_key=True)
name = Column(String)
user_id = Column(Integer, ForeignKey('users.id'))
ttl = Column(BigInteger)
issued_at = Column(BigInteger)
revoked = Column(Boolean)
def __repr__(self):
return "ApiKey(name={name}, user_id={user_id}, ttl={ttl}, issued_at={iat}, revoked={revoked})".format(
user_id=self.user_id, name=self.name, ttl=self.ttl, iat=self.issued_at, revoked=self.revoked)

51
lemur/api_keys/schemas.py Normal file
View File

@ -0,0 +1,51 @@
"""
.. module: lemur.api_keys.schemas
:platform: Unix
:copyright: (c) 2017 by Netflix Inc., see AUTHORS for more
:license: Apache, see LICENSE for more details.
.. moduleauthor:: Eric Coan <kungfury@instructure.com>
"""
from marshmallow import fields
from lemur.common.schema import LemurInputSchema, LemurOutputSchema
class ApiKeyInputSchema(LemurInputSchema):
name = fields.String(required=False)
user_id = fields.Integer()
ttl = fields.Integer()
class ApiKeyRevokeSchema(LemurInputSchema):
id = fields.Integer(required=False)
name = fields.String()
user_id = fields.Integer(required=False)
revoked = fields.Boolean()
ttl = fields.Integer()
issued_at = fields.Integer(required=False)
class UserApiKeyInputSchema(LemurInputSchema):
name = fields.String(required=False)
ttl = fields.Integer()
class ApiKeyOutputSchema(LemurOutputSchema):
jwt = fields.String()
class ApiKeyDescribedOutputSchema(LemurOutputSchema):
id = fields.Integer()
name = fields.String()
user_id = fields.Integer()
ttl = fields.Integer()
issued_at = fields.Integer()
revoked = fields.Boolean()
api_key_input_schema = ApiKeyInputSchema()
api_key_revoke_schema = ApiKeyRevokeSchema()
api_key_output_schema = ApiKeyOutputSchema()
api_keys_output_schema = ApiKeyDescribedOutputSchema(many=True)
api_key_described_output_schema = ApiKeyDescribedOutputSchema()
user_api_key_input_schema = UserApiKeyInputSchema()

97
lemur/api_keys/service.py Normal file
View File

@ -0,0 +1,97 @@
"""
.. module: lemur.api_keys.service
:platform: Unix
:copyright: (c) 2017 by Netflix Inc., see AUTHORS for more
:license: Apache, see LICENSE for more details.
.. moduleauthor:: Eric Coan <kungfury@instructure.com>
"""
from lemur import database
from lemur.api_keys.models import ApiKey
def get(aid):
"""
Retrieves an api key by its ID.
:param aid: The access key id to get.
:return:
"""
return database.get(ApiKey, aid)
def delete(access_key):
"""
Delete an access key. This is one way to remove a key, though you probably should just set revoked.
:param access_key:
:return:
"""
database.delete(access_key)
def revoke(aid):
"""
Revokes an api key.
:param aid:
:return:
"""
api_key = get(aid)
setattr(api_key, 'revoked', False)
return database.update(api_key)
def get_all_api_keys():
"""
Retrieves all Api Keys.
:return:
"""
return ApiKey.query.all()
def create(**kwargs):
"""
Creates a new API Key.
:param kwargs:
:return:
"""
api_key = ApiKey(**kwargs)
database.create(api_key)
return api_key
def update(api_key, **kwargs):
"""
Updates an api key.
:param api_key:
:param kwargs:
:return:
"""
for key, value in kwargs.items():
setattr(api_key, key, value)
return database.update(api_key)
def render(args):
"""
Helper to parse REST Api requests
:param args:
:return:
"""
query = database.session_query(ApiKey)
user_id = args.pop('user_id', None)
aid = args.pop('id', None)
has_permission = args.pop('has_permission', False)
requesting_user_id = args.pop('requesting_user_id')
if user_id:
query = query.filter(ApiKey.user_id == user_id)
if aid:
query = query.filter(ApiKey.id == aid)
if not has_permission:
query = query.filter(ApiKey.user_id == requesting_user_id)
return database.sort_and_page(query, ApiKey, args)

558
lemur/api_keys/views.py Normal file
View File

@ -0,0 +1,558 @@
"""
.. module: lemur.api_keys.views
:platform: Unix
:copyright: (c) 2017 by Netflix Inc., see AUTHORS for more
:license: Apache, see LICENSE for more details.
.. moduleauthor:: Eric Coan <kungfury@instructure.com>
"""
from datetime import datetime
from flask import Blueprint, g
from flask_restful import reqparse, Api
from lemur.api_keys import service
from lemur.auth.service import AuthenticatedResource, create_token
from lemur.auth.permissions import ApiKeyCreatorPermission
from lemur.common.schema import validate_schema
from lemur.common.utils import paginated_parser
from lemur.api_keys.schemas import api_key_input_schema, api_key_revoke_schema, api_key_output_schema, \
api_keys_output_schema, api_key_described_output_schema, user_api_key_input_schema
mod = Blueprint('api_keys', __name__)
api = Api(mod)
class ApiKeyList(AuthenticatedResource):
""" Defines the 'api_keys' endpoint """
def __init__(self):
super(ApiKeyList, self).__init__()
@validate_schema(None, api_keys_output_schema)
def get(self):
"""
.. http:get:: /keys
The current list of api keys, that you can see.
**Example request**:
.. sourcecode:: http
GET /keys HTTP/1.1
Host: example.com
Accept: application/json, text/javascript
**Example response**:
.. sourcecode:: http
HTTP/1.1 200 OK
Vary: Accept
Content-Type: text/javascript
{
"items": [
{
"id": 1,
"name": "custom name",
"user_id": 1,
"ttl": -1,
"issued_at": 12,
"revoked": false
}
],
"total": 1
}
:query sortBy: field to sort on
:query sortDir: asc or desc
:query page: int default is 1
:query count: count number. default is 10
:query user_id: a user to filter by.
:query id: an access key to filter by.
:reqheader Authorization: OAuth token to authenticate
:statuscode 200: no error
:statuscode 403: unauthenticated
"""
parser = paginated_parser.copy()
args = parser.parse_args()
args['has_permission'] = ApiKeyCreatorPermission().can()
args['requesting_user_id'] = g.current_user.id
return service.render(args)
@validate_schema(api_key_input_schema, api_key_output_schema)
def post(self, data=None):
"""
.. http:post:: /keys
Creates an API Key.
**Example request**:
.. sourcecode:: http
POST /keys HTTP/1.1
Host: example.com
Accept: application/json, text/javascript
{
"name": "my custom name",
"user_id": 1,
"ttl": -1
}
**Example response**:
.. sourcecode:: http
HTTP/1.1 200 OK
Vary: Accept
Content-Type: text/javascript
{
"jwt": ""
}
:reqheader Authorization: OAuth token to authenticate
:statuscode 200: no error
:statuscode 403: unauthenticated
"""
if not ApiKeyCreatorPermission().can():
if data['user_id'] != g.current_user.id:
return dict(message="You are not authorized to create tokens for: {0}".format(data['user_id'])), 403
access_token = service.create(name=data['name'], user_id=data['user_id'], ttl=data['ttl'],
revoked=False, issued_at=int(datetime.utcnow().timestamp()))
return dict(jwt=create_token(access_token.user_id, access_token.id, access_token.ttl))
class ApiKeyUserList(AuthenticatedResource):
""" Defines the 'keys' endpoint on the 'users' endpoint. """
def __init__(self):
super(ApiKeyUserList, self).__init__()
@validate_schema(None, api_keys_output_schema)
def get(self, user_id):
"""
.. http:get:: /users/:user_id/keys
The current list of api keys for a user, that you can see.
**Example request**:
.. sourcecode:: http
GET /users/1/keys HTTP/1.1
Host: example.com
Accept: application/json, text/javascript
**Example response**:
.. sourcecode:: http
HTTP/1.1 200 OK
Vary: Accept
Content-Type: text/javascript
{
"items": [
{
"id": 1,
"name": "custom name",
"user_id": 1,
"ttl": -1,
"issued_at": 12,
"revoked": false
}
],
"total": 1
}
:query sortBy: field to sort on
:query sortDir: asc or desc
:query page: int default is 1
:query count: count number. default is 10
:query id: an access key to filter by.
:reqheader Authorization: OAuth token to authenticate
:statuscode 200: no error
:statuscode 403: unauthenticated
"""
parser = paginated_parser.copy()
args = parser.parse_args()
args['has_permission'] = ApiKeyCreatorPermission().can()
args['requesting_user_id'] = g.current_user.id
args['user_id'] = user_id
return service.render(args)
@validate_schema(user_api_key_input_schema, api_key_output_schema)
def post(self, user_id, data=None):
"""
.. http:post:: /users/:user_id/keys
Creates an API Key for a user.
**Example request**:
.. sourcecode:: http
POST /users/1/keys HTTP/1.1
Host: example.com
Accept: application/json, text/javascript
{
"name": "my custom name"
"ttl": -1
}
**Example response**:
.. sourcecode:: http
HTTP/1.1 200 OK
Vary: Accept
Content-Type: text/javascript
{
"jwt": ""
}
:reqheader Authorization: OAuth token to authenticate
:statuscode 200: no error
:statuscode 403: unauthenticated
"""
if not ApiKeyCreatorPermission().can():
if user_id != g.current_user.id:
return dict(message="You are not authorized to create tokens for: {0}".format(user_id)), 403
access_token = service.create(name=data['name'], user_id=user_id, ttl=data['ttl'],
revoked=False, issued_at=int(datetime.utcnow().timestamp()))
return dict(jwt=create_token(access_token.user_id, access_token.id, access_token.ttl))
class ApiKeys(AuthenticatedResource):
def __init__(self):
self.reqparse = reqparse.RequestParser()
super(ApiKeys, self).__init__()
@validate_schema(None, api_key_output_schema)
def get(self, aid):
"""
.. http:get:: /keys/1
Fetch one api key
**Example request**:
.. sourcecode:: http
GET /keys/1 HTTP/1.1
Host: example.com
Accept: application/json, text/javascript
**Example response**:
.. sourcecode:: http
HTTP/1.1 200 OK
Vary: Accept
Content-Type: text/javascript
{
"jwt": ""
}
:reqheader Authorization: OAuth token to authenticate
:statuscode 200: no error
:statuscode 403: unauthenticated
"""
access_key = service.get(aid)
if access_key is None:
return dict(message="This token does not exist!"), 404
if access_key.user_id != g.current_user.id:
if not ApiKeyCreatorPermission().can():
return dict(message="You are not authorized to view this token!"), 403
return dict(jwt=create_token(access_key.user_id, access_key.id, access_key.ttl))
@validate_schema(api_key_revoke_schema, api_key_output_schema)
def put(self, aid, data=None):
"""
.. http:put:: /keys/1
update one api key
**Example request**:
.. sourcecode:: http
PUT /keys/1 HTTP/1.1
Host: example.com
Accept: application/json, text/javascript
{
"name": "new_name",
"revoked": false,
"ttl": -1
}
**Example response**:
.. sourcecode:: http
HTTP/1.1 200 OK
Vary: Accept
Content-Type: text/javascript
{
"jwt": ""
}
:reqheader Authorization: OAuth token to authenticate
:statuscode 200: no error
:statuscode 403: unauthenticated
"""
access_key = service.get(aid)
if access_key is None:
return dict(message="This token does not exist!"), 404
if access_key.user_id != g.current_user.id:
if not ApiKeyCreatorPermission().can():
return dict(message="You are not authorized to update this token!"), 403
service.update(access_key, name=data['name'], revoked=data['revoked'], ttl=data['ttl'])
return dict(jwt=create_token(access_key.user_id, access_key.id, access_key.ttl))
def delete(self, aid):
"""
.. http:delete:: /keys/1
deletes one api key
**Example request**:
.. sourcecode:: http
DELETE /keys/1 HTTP/1.1
Host: example.com
Accept: application/json, text/javascript
**Example response**:
.. sourcecode:: http
HTTP/1.1 200 OK
Vary: Accept
Content-Type: text/javascript
{
"result": true
}
:reqheader Authorization: OAuth token to authenticate
:statuscode 200: no error
:statuscode 403: unauthenticated
"""
access_key = service.get(aid)
if access_key is None:
return dict(message="This token does not exist!"), 404
if access_key.user_id != g.current_user.id:
if not ApiKeyCreatorPermission().can():
return dict(message="You are not authorized to delete this token!"), 403
service.delete(access_key)
return {'result': True}
class UserApiKeys(AuthenticatedResource):
def __init__(self):
self.reqparse = reqparse.RequestParser()
super(UserApiKeys, self).__init__()
@validate_schema(None, api_key_output_schema)
def get(self, uid, aid):
"""
.. http:get:: /users/1/keys/1
Fetch one api key
**Example request**:
.. sourcecode:: http
GET /users/1/api_keys/1 HTTP/1.1
Host: example.com
Accept: application/json, text/javascript
**Example response**:
.. sourcecode:: http
HTTP/1.1 200 OK
Vary: Accept
Content-Type: text/javascript
{
"jwt": ""
}
:reqheader Authorization: OAuth token to authenticate
:statuscode 200: no error
:statuscode 403: unauthenticated
"""
if uid != g.current_user.id:
if not ApiKeyCreatorPermission().can():
return dict(message="You are not authorized to view this token!"), 403
access_key = service.get(aid)
if access_key is None:
return dict(message="This token does not exist!"), 404
if access_key.user_id != uid:
return dict(message="You are not authorized to view this token!"), 403
return dict(jwt=create_token(access_key.user_id, access_key.id, access_key.ttl))
@validate_schema(api_key_revoke_schema, api_key_output_schema)
def put(self, uid, aid, data=None):
"""
.. http:put:: /users/1/keys/1
update one api key
**Example request**:
.. sourcecode:: http
PUT /users/1/keys/1 HTTP/1.1
Host: example.com
Accept: application/json, text/javascript
{
"name": "new_name",
"revoked": false,
"ttl": -1
}
**Example response**:
.. sourcecode:: http
HTTP/1.1 200 OK
Vary: Accept
Content-Type: text/javascript
{
"jwt": ""
}
:reqheader Authorization: OAuth token to authenticate
:statuscode 200: no error
:statuscode 403: unauthenticated
"""
if uid != g.current_user.id:
if not ApiKeyCreatorPermission().can():
return dict(message="You are not authorized to view this token!"), 403
access_key = service.get(aid)
if access_key is None:
return dict(message="This token does not exist!"), 404
if access_key.user_id != uid:
return dict(message="You are not authorized to update this token!"), 403
service.update(access_key, name=data['name'], revoked=data['revoked'], ttl=data['ttl'])
return dict(jwt=create_token(access_key.user_id, access_key.id, access_key.ttl))
def delete(self, uid, aid):
"""
.. http:delete:: /users/1/keys/1
deletes one api key
**Example request**:
.. sourcecode:: http
DELETE /users/1/keys/1 HTTP/1.1
Host: example.com
Accept: application/json, text/javascript
**Example response**:
.. sourcecode:: http
HTTP/1.1 200 OK
Vary: Accept
Content-Type: text/javascript
{
"result": true
}
:reqheader Authorization: OAuth token to authenticate
:statuscode 200: no error
:statuscode 403: unauthenticated
"""
if uid != g.current_user.id:
if not ApiKeyCreatorPermission().can():
return dict(message="You are not authorized to view this token!"), 403
access_key = service.get(aid)
if access_key is None:
return dict(message="This token does not exist!"), 404
if access_key.user_id != uid:
return dict(message="You are not authorized to delete this token!"), 403
service.delete(access_key)
return {'result': True}
class ApiKeysDescribed(AuthenticatedResource):
def __init__(self):
self.reqparse = reqparse.RequestParser()
super(ApiKeysDescribed, self).__init__()
@validate_schema(None, api_key_described_output_schema)
def get(self, aid):
"""
.. http:get:: /keys/1/described
Fetch one api key
**Example request**:
.. sourcecode:: http
GET /keys/1 HTTP/1.1
Host: example.com
Accept: application/json, text/javascript
**Example response**:
.. sourcecode:: http
HTTP/1.1 200 OK
Vary: Accept
Content-Type: text/javascript
{
"id": 2,
"name": "hoi",
"user_id": 2,
"ttl": -1,
"issued_at": 1222222,
"revoked": false
}
:reqheader Authorization: OAuth token to authenticate
:statuscode 200: no error
:statuscode 403: unauthenticated
"""
access_key = service.get(aid)
if access_key is None:
return dict(message="This token does not exist!"), 404
if access_key.user_id != g.current_user.id:
if not ApiKeyCreatorPermission().can():
return dict(message="You are not authorized to view this token!"), 403
return access_key
api.add_resource(ApiKeyList, '/keys', endpoint='api_keys')
api.add_resource(ApiKeys, '/keys/<int:aid>', endpoint='api_key')
api.add_resource(ApiKeysDescribed, '/keys/<int:aid>/described', endpoint='api_key_described')
api.add_resource(ApiKeyUserList, '/users/<int:user_id>/keys', endpoint='user_api_keys')
api.add_resource(UserApiKeys, '/users/<int:uid>/keys/<int:aid>', endpoint='user_api_key')

View File

@ -33,6 +33,11 @@ class CertificatePermission(Permission):
super(CertificatePermission, self).__init__(*needs) super(CertificatePermission, self).__init__(*needs)
class ApiKeyCreatorPermission(Permission):
def __init__(self):
super(ApiKeyCreatorPermission, self).__init__(RoleNeed('admin'))
RoleMember = namedtuple('role', ['method', 'value']) RoleMember = namedtuple('role', ['method', 'value'])
RoleMemberNeed = partial(RoleMember, 'member') RoleMemberNeed = partial(RoleMember, 'member')

View File

@ -27,6 +27,7 @@ from cryptography.hazmat.primitives import serialization
from cryptography.hazmat.primitives.asymmetric.rsa import RSAPublicNumbers from cryptography.hazmat.primitives.asymmetric.rsa import RSAPublicNumbers
from lemur.users import service as user_service from lemur.users import service as user_service
from lemur.api_keys import service as api_key_service
from lemur.auth.permissions import AuthorityCreatorNeed, RoleMemberNeed from lemur.auth.permissions import AuthorityCreatorNeed, RoleMemberNeed
@ -48,9 +49,9 @@ def get_rsa_public_key(n, e):
) )
def create_token(user): def create_token(user, aid=None, ttl=None):
""" """
Create a valid JWT for a given user, this token is then used to authenticate Create a valid JWT for a given user/api key, this token is then used to authenticate
sessions until the token expires. sessions until the token expires.
:param user: :param user:
@ -58,10 +59,24 @@ def create_token(user):
""" """
expiration_delta = timedelta(days=int(current_app.config.get('LEMUR_TOKEN_EXPIRATION', 1))) expiration_delta = timedelta(days=int(current_app.config.get('LEMUR_TOKEN_EXPIRATION', 1)))
payload = { payload = {
'sub': user.id,
'iat': datetime.utcnow(), 'iat': datetime.utcnow(),
'exp': datetime.utcnow() + expiration_delta 'exp': datetime.utcnow() + expiration_delta
} }
# Handle Just a User ID & User Object.
if isinstance(user, int):
payload['sub'] = user
else:
payload['sub'] = user.id
if aid is not None:
payload['aid'] = aid
# Custom TTLs are only supported on Access Keys.
if ttl is not None and aid is not None:
# Tokens that are forever until revoked.
if ttl == -1:
del payload['exp']
else:
payload['exp'] = ttl
token = jwt.encode(payload, current_app.config['LEMUR_TOKEN_SECRET']) token = jwt.encode(payload, current_app.config['LEMUR_TOKEN_SECRET'])
return token.decode('unicode_escape') return token.decode('unicode_escape')
@ -94,6 +109,16 @@ def login_required(f):
except jwt.InvalidTokenError: except jwt.InvalidTokenError:
return dict(message='Token is invalid'), 403 return dict(message='Token is invalid'), 403
if 'aid' in payload:
access_key = api_key_service.get(payload['aid'])
if access_key.revoked:
return dict(message='Token has been revoked'), 403
if access_key.ttl != -1:
current_time = datetime.utcnow()
expired_time = datetime.fromtimestamp(access_key.issued_at + access_key.ttl)
if current_time >= expired_time:
return dict(message='Token has expired'), 403
user = user_service.get(payload['sub']) user = user_service.get(payload['sub'])
if not user.active: if not user.active:

View File

@ -0,0 +1,31 @@
"""Adds JWT Tokens to Users
Revision ID: c05a8998b371
Revises: ac483cfeb230
Create Date: 2017-11-10 14:51:28.975927
"""
# revision identifiers, used by Alembic.
revision = 'c05a8998b371'
down_revision = 'ac483cfeb230'
from alembic import op
import sqlalchemy as sa
import sqlalchemy_utils
def upgrade():
op.create_table('api_keys',
sa.Column('id', sa.Integer(), nullable=False),
sa.Column('name', sa.String(length=128), nullable=True),
sa.Column('user_id', sa.Integer(), nullable=False),
sa.Column('ttl', sa.BigInteger(), nullable=False),
sa.Column('issued_at', sa.BigInteger(), nullable=False),
sa.Column('revoked', sa.Boolean(), nullable=False),
sa.ForeignKeyConstraint(['user_id'], ['users.id'], ),
sa.PrimaryKeyConstraint('id')
)
def downgrade():
op.drop_table('api_keys')

View File

@ -0,0 +1,69 @@
'use strict';
angular.module('lemur')
.controller('ApiKeysCreateController', function ($scope, $uibModalInstance, PluginService, ApiKeyService, LemurRestangular, toaster) {
$scope.apiKey = LemurRestangular.restangularizeElement(null, {}, 'keys');
$scope.save = function (apiKey) {
ApiKeyService.create(apiKey).then(
function (responseBody) {
toaster.pop({
type: 'success',
title: 'Success!',
body: 'Successfully Created JWT!'
});
$scope.jwt = responseBody.jwt;
}, function (response) {
toaster.pop({
type: 'error',
title: apiKey.name || 'Unnamed Api Key',
body: 'lemur-bad-request',
bodyOutputType: 'directive',
directiveData: response.data,
timeout: 100000
});
});
};
$scope.cancel = function () {
$uibModalInstance.dismiss('cancel');
};
$scope.close = function() {
$uibModalInstance.close();
};
})
.controller('ApiKeysEditController', function ($scope, $uibModalInstance, ApiKeyService, LemurRestangular, toaster, editId) {
LemurRestangular.one('keys', editId).customGET('described').then(function(apiKey) {
$scope.apiKey = apiKey;
});
$scope.save = function (apiKey) {
ApiKeyService.update(apiKey).then(
function (responseBody) {
toaster.pop({
type: 'success',
title: 'Success',
body: 'Successfully updated JWT!'
});
$scope.jwt = responseBody.jwt;
}, function (response) {
toaster.pop({
type: 'error',
title: apiKey.name || 'Unnamed API Key',
body: 'lemur-bad-request',
bodyOutputType: 'directive',
directiveData: response.data,
timeout: 100000
});
});
};
$scope.cancel = function () {
$uibModalInstance.dismiss('cancel');
};
$scope.close = function() {
$uibModalInstance.close();
};
});

View File

@ -0,0 +1,54 @@
<div class="modal-header" ng-show="!jwt">
<button type="button" class="close" ng-click="cancel()" aria-label="Close"><span aria-hidden="true">&times;</span></button>
<h3><span ng-show="!apiKey.fromServer">Create</span><span ng-show="apiKey.fromServer">Edit</span></h3>
</div>
<div class="modal-header" ng-show="jwt">
<button type="button" class="close" ng-click="close()" aria-label="Close"><span aria-hidden="true">&times;</span></button>
<h3><span>Token</span></h3>
</div>
<div class="modal-body" ng-show="!jwt">
<form name="createForm" class="form-horizontal" role="form" novalidate>
<div class="form-group"
ng-class="{'has-error': createForm.name.$invalid, 'has-success': !createForm.name.$invalid&&createForm.name.$dirty}">
<label class="control-label col-sm-2">
Name
</label>
<div class="col-sm-10">
<input name="name" ng-model="apiKey.name" placeholder="A Cool API Key" class="form-control" required/>
<p ng-show="createForm.label.$invalid && !createForm.label.$pristine" class="help-block">You must enter an api key name</p>
</div>
</div>
<div class="form-group"
ng-class="{'has-error': createForm.user_id.$invalid, 'has-success': !createForm.user_id.$invalid&&createForm.user_id.$dirty}">
<label class="control-label col-sm-2">
User ID
</label>
<div class="col-sm-10">
<input name="user_id" ng-model="apiKey.userId" placeholder="42" class="form-control" type="number" required/>
<p ng-show="createForm.label.$invalid && !createForm.label.$pristine" class="help-block">You must enter an API Key User ID.</p>
</div>
</div>
<div class="form-group"
ng-class="{'has-error': createForm.ttl.$invalid, 'has-success': !createForm.ttl.$invalid&&createForm.ttl.$dirty}">
<label class="control-label col-sm-2">
TTL
</label>
<div class="col-sm-10">
<input name="ttl" ng-model="apiKey.ttl" placeholder="-1" class="form-control" type="number" required/>
<p ng-show="createForm.label.$invalid && !createForm.label.$pristine" class="help-block">You must enter an API Key TTL.</p>
</div>
</div>
</form>
</div>
<div ng-show="jwt">
<h3>Successfully exported!</h3>
<h4>Your Token is: <pre><code>{{ jwt }}</code></pre></h4>
</div>
<div class="modal-footer" ng-show="!jwt">
<button ng-click="save(apiKey)" type="submit" ng-disabled="createForm.$invalid" class="btn btn-primary">Save</button>
<button ng-click="cancel()" class="btn btn-danger">Cancel</button>
</div>
<div class="modal-footer" ng-show="jwt">
<button ng-click="close()" class="btn btn-primary">Close</button>
</div>

View File

@ -0,0 +1,20 @@
'use strict';
angular.module('lemur')
.service('ApiKeyApi', function (LemurRestangular) {
return LemurRestangular.all('keys');
})
.service('ApiKeyService', function ($location, ApiKeyApi) {
var ApiKeyService = this;
ApiKeyService.update = function(apiKey) {
return apiKey.put();
};
ApiKeyService.create = function (apiKey) {
return ApiKeyApi.post(apiKey);
};
ApiKeyService.delete = function (apiKey) {
return apiKey.remove();
};
});

View File

@ -0,0 +1,104 @@
'use strict';
angular.module('lemur')
.config(function config($stateProvider) {
$stateProvider.state('keys', {
url: '/keys',
templateUrl: '/angular/api_keys/view/view.tpl.html',
controller: 'ApiKeysViewController'
});
})
.controller('ApiKeysViewController', function ($scope, $uibModal, ApiKeyApi, ApiKeyService, ngTableParams, toaster) {
$scope.filter = {};
$scope.apiKeysTable = new ngTableParams({
page: 1, // show first page
count: 10, // count per page
sorting: {
id: 'desc' // initial sorting
},
filter: $scope.filter
}, {
total: 0, // length of data
getData: function ($defer, params) {
ApiKeyApi.getList(params.url()).then(function (data) {
params.total(data.total);
$defer.resolve(data);
});
}
});
$scope.updateRevoked = function (apiKey) {
ApiKeyService.update(apiKey).then(
function () {
toaster.pop({
type: 'success',
title: 'Updated JWT!',
body: apiKey.jwt
});
},
function (response) {
toaster.pop({
type: 'error',
title: apiKey.name || 'Unnamed API Key',
body: 'Unable to update! ' + response.data.message,
timeout: 100000
});
});
};
$scope.create = function () {
var uibModalInstance = $uibModal.open({
animation: true,
controller: 'ApiKeysCreateController',
templateUrl: '/angular/api_keys/api_key/api_key.tpl.html',
size: 'lg',
backdrop: 'static'
});
uibModalInstance.result.then(function () {
$scope.apiKeysTable.reload();
});
};
$scope.remove = function (apiKey) {
apiKey.remove().then(
function () {
toaster.pop({
type: 'success',
title: 'Removed!',
body: 'Deleted that API Key!'
});
$scope.apiKeysTable.reload();
},
function (response) {
toaster.pop({
type: 'error',
title: 'Opps',
body: 'I see what you did there: ' + response.data.message
});
}
);
};
$scope.edit = function (apiKeyId) {
var uibModalInstance = $uibModal.open({
animation: true,
templateUrl: '/angular/api_keys/api_key/api_key.tpl.html',
controller: 'ApiKeysEditController',
size: 'lg',
backdrop: 'static',
resolve: {
editId: function () {
return apiKeyId;
}
}
});
uibModalInstance.result.then(function () {
$scope.apiKeysTable.reload();
});
};
});

View File

@ -0,0 +1,50 @@
<div class="row">
<div class="col-md-12">
<h2 class="featurette-heading">API Keys
<span class="text-muted"><small>For accidentally uploading to github</small></span></h2>
<div class="panel panel-default">
<div class="panel-heading">
<div class="btn-group pull-right">
<button ng-click="create()" class="btn btn-primary">Create</button>
</div>
<div class="clearfix"></div>
</div>
<div class="table-responsive">
<table ng-table="apiKeysTable" class="table table-striped" template-pagination="angular/pager.html">
<tbody>
<tr ng-repeat="apiKey in $data track by $index">
<td data-title="'ID'" align="center">
{{ apiKey.id }}
</td>
<td data-title="'Name'" sortable="'name'" align="center">
{{ apiKey.name || 'Unnamed Api Key' }}
</td>
<td data-title="'User ID'" sortable="'user_id'" align="center">
{{ apiKey.userId }}
</td>
<td data-title="'TTL'" align="center">
{{ apiKey.ttl == -1 ? 'Forever' : apiKey.ttl }}
</td>
<td data-title="'Revoked'" align="center">
<form>
<switch ng-change="updateRevoked(apiKey)" id="status" name="status"
ng-model="apiKey.revoked" class="green small"></switch>
</form>
</td>
<td data-title="''">
<div class="btn-group-vertical pull-right">
<button uib-tooltip="Edit Key" ng-click="edit(apiKey.id)" class="btn btn-sm btn-info">
Edit
</button>
<button uib-tooltip="Delete Key" ng-click="remove(apiKey)" type="button" class="btn btn-sm btn-danger pull-left">
Remove
</button>
</div>
</td>
</tr>
</tbody>
</table>
</div>
</div>
</div>
</div>

View File

@ -3,7 +3,7 @@
*/ */
'use strict'; 'use strict';
angular.module('lemur') angular.module('lemur')
.service('UserApi', function (LemurRestangular) { .service('UserApi', function (LemurRestangular, ApiKeyService) {
LemurRestangular.extendModel('users', function (obj) { LemurRestangular.extendModel('users', function (obj) {
return angular.extend(obj, { return angular.extend(obj, {
attachRole: function (role) { attachRole: function (role) {
@ -15,6 +15,11 @@ angular.module('lemur')
}, },
removeRole: function (index) { removeRole: function (index) {
this.roles.splice(index, 1); this.roles.splice(index, 1);
},
removeApiKey: function (index) {
var removedApiKeys = this.apiKeys.splice(index, 1);
var removedApiKey = removedApiKeys[0];
return ApiKeyService.delete(removedApiKey);
} }
}); });
}); });
@ -41,6 +46,12 @@ angular.module('lemur')
}); });
}; };
UserService.getApiKeys = function (user) {
user.getList('keys').then(function (apiKeys) {
user.apiKeys = apiKeys;
});
};
UserService.loadMoreRoles = function (user, page) { UserService.loadMoreRoles = function (user, page) {
user.getList('roles', {page: page}).then(function (roles) { user.getList('roles', {page: page}).then(function (roles) {
_.each(roles, function (role) { _.each(roles, function (role) {
@ -49,6 +60,14 @@ angular.module('lemur')
}); });
}; };
UserService.loadMoreApiKeys = function (user, page) {
user.getList('keys', {page: page}).then(function (apiKeys) {
_.each(apiKeys, function (apiKey) {
user.apiKeys.push(apiKey);
});
});
};
UserService.create = function (user) { UserService.create = function (user) {
return UserApi.post(user); return UserApi.post(user);
}; };

View File

@ -2,14 +2,17 @@
angular.module('lemur') angular.module('lemur')
.controller('UsersEditController', function ($scope, $uibModalInstance, UserApi, UserService, RoleService, toaster, editId) { .controller('UsersEditController', function ($scope, $uibModalInstance, UserApi, UserService, RoleService, ApiKeyService, toaster, editId) {
UserApi.get(editId).then(function (user) { UserApi.get(editId).then(function (user) {
$scope.user = user; $scope.user = user;
UserService.getApiKeys(user);
}); });
$scope.roleService = RoleService; $scope.roleService = RoleService;
$scope.apiKeyService = ApiKeyService;
$scope.rolePage = 1; $scope.rolePage = 1;
$scope.apiKeyPage = 1;
$scope.save = function (user) { $scope.save = function (user) {
UserService.update(user).then( UserService.update(user).then(
@ -36,15 +39,40 @@ angular.module('lemur')
$uibModalInstance.dismiss('cancel'); $uibModalInstance.dismiss('cancel');
}; };
$scope.removeApiKey = function (idx) {
UserApi.removeApiKey(idx).then(function () {
toaster.pop({
type: 'success',
title: 'Removed API Key!',
body: 'Successfully removed the api key!'
});
}, function(err) {
toaster.pop({
type: 'error',
title: 'Failed to remove API Key!',
body: 'lemur-bad-request',
bodyOutputType: 'directive',
directiveData: err,
timeout: 100000
});
});
};
$scope.loadMoreRoles = function () { $scope.loadMoreRoles = function () {
$scope.rolePage += 1; $scope.rolePage += 1;
UserService.loadMoreRoles($scope.user, $scope.rolePage); UserService.loadMoreRoles($scope.user, $scope.rolePage);
}; };
$scope.loadMoreApiKeys = function () {
$scope.apiKeyPage += 1;
UserService.loadMoreApiKeys($scope.user, $scope.apiKeyPage);
};
}) })
.controller('UsersCreateController', function ($scope, $uibModalInstance, UserService, LemurRestangular, RoleService, toaster) { .controller('UsersCreateController', function ($scope, $uibModalInstance, UserService, LemurRestangular, RoleService, ApiKeyService, toaster) {
$scope.user = LemurRestangular.restangularizeElement(null, {}, 'users'); $scope.user = LemurRestangular.restangularizeElement(null, {}, 'users');
$scope.roleService = RoleService; $scope.roleService = RoleService;
$scope.apiKeyService = ApiKeyService;
$scope.save = function (user) { $scope.save = function (user) {
UserService.create(user).then( UserService.create(user).then(

View File

@ -85,6 +85,27 @@
</table> </table>
</div> </div>
</div> </div>
<div class="form-group">
<label class="control-label col-sm-2">
API Keys
</label>
<div class="col-sm-10">
<table ng-show="user.apiKeys" class="table">
<tr ng-repeat="apiKey in user.apiKeys track by $index">
<td><a class="btn btn-sm btn-info" href="#/keys/{{ apiKey.id }}/edit">{{ apiKey.name || "Unnamed Api Key" }}</a></td>
<td>
<button type="button" ng-click="user.removeApiKey($index)" class="btn btn-danger btn-sm pull-right">Remove
</button>
</td>
</tr>
<tr>
<td></td>
<td></td>
<td><a class="pull-right" ng-click="loadMoreApiKeys()"><strong>More</strong></a></td>
</tr>
</table>
</div>
</div>
</form> </form>
<div class="modal-footer"> <div class="modal-footer">
<button ng-click="save(user)" type="submit" ng-disabled="createForm.$invalid" class="btn btn-primary">Save</button> <button ng-click="save(user)" type="submit" ng-disabled="createForm.$invalid" class="btn btn-primary">Save</button>

View File

@ -63,6 +63,7 @@
<li><a ui-sref="users">Users</a></li> <li><a ui-sref="users">Users</a></li>
<li><a ui-sref="domains">Domains</a></li> <li><a ui-sref="domains">Domains</a></li>
<li><a ui-sref="logs">Logs</a></li> <li><a ui-sref="logs">Logs</a></li>
<li><a ui-sref="keys">Api Keys</a></li>
</ul> </ul>
</li> </li>
</ul> </ul>

View File

@ -7,7 +7,7 @@ from lemur import create_app
from lemur.database import db as _db from lemur.database import db as _db
from lemur.auth.service import create_token from lemur.auth.service import create_token
from .factories import AuthorityFactory, NotificationFactory, DestinationFactory, \ from .factories import ApiKeyFactory, AuthorityFactory, NotificationFactory, DestinationFactory, \
CertificateFactory, UserFactory, RoleFactory, SourceFactory, EndpointFactory, RotationPolicyFactory CertificateFactory, UserFactory, RoleFactory, SourceFactory, EndpointFactory, RotationPolicyFactory
@ -52,8 +52,9 @@ def db(app, request):
UserFactory() UserFactory()
r = RoleFactory(name='admin') r = RoleFactory(name='admin')
UserFactory(roles=[r]) u = UserFactory(roles=[r])
rp = RotationPolicyFactory(name='default') rp = RotationPolicyFactory(name='default')
ApiKeyFactory(user=u)
_db.session.commit() _db.session.commit()
yield _db yield _db

View File

@ -16,6 +16,7 @@ from lemur.users.models import User
from lemur.roles.models import Role from lemur.roles.models import Role
from lemur.endpoints.models import Policy, Endpoint from lemur.endpoints.models import Policy, Endpoint
from lemur.policies.models import RotationPolicy from lemur.policies.models import RotationPolicy
from lemur.api_keys.models import ApiKey
from .vectors import INTERNAL_VALID_SAN_STR, PRIVATE_KEY_STR from .vectors import INTERNAL_VALID_SAN_STR, PRIVATE_KEY_STR
@ -260,3 +261,23 @@ class EndpointFactory(BaseFactory):
class Meta: class Meta:
"""Factory Configuration.""" """Factory Configuration."""
model = Endpoint model = Endpoint
class ApiKeyFactory(BaseFactory):
"""Api Key Factory."""
name = Sequence(lambda n: 'api_key_{0}'.format(n))
revoked = False
ttl = -1
issued_at = 1
class Meta:
"""Factory Configuration."""
model = ApiKey
@post_generation
def user(self, create, extracted, **kwargs):
if not create:
return
if extracted:
self.userId = extracted.id

View File

@ -0,0 +1,222 @@
import json
import pytest
from lemur.api_keys.views import * # noqa
from .vectors import VALID_ADMIN_API_TOKEN, VALID_ADMIN_HEADER_TOKEN, VALID_USER_HEADER_TOKEN
@pytest.mark.parametrize("token,status", [
(VALID_USER_HEADER_TOKEN, 200),
(VALID_ADMIN_HEADER_TOKEN, 200),
(VALID_ADMIN_API_TOKEN, 200),
('', 401)
])
def test_api_key_list_get(client, token, status):
assert client.get(api.url_for(ApiKeyList), headers=token).status_code == status
@pytest.mark.parametrize("token,status", [
(VALID_USER_HEADER_TOKEN, 400),
(VALID_ADMIN_HEADER_TOKEN, 400),
(VALID_ADMIN_API_TOKEN, 400),
('', 401)
])
def test_api_key_list_post_invalid(client, token, status):
assert client.post(api.url_for(ApiKeyList), data={}, headers=token).status_code == status
@pytest.mark.parametrize("token,user_id,status", [
(VALID_USER_HEADER_TOKEN, 1, 200),
(VALID_ADMIN_HEADER_TOKEN, 2, 200),
(VALID_ADMIN_API_TOKEN, 2, 200),
('', 0, 401)
])
def test_api_key_list_post_valid_self(client, user_id, token, status):
assert client.post(api.url_for(ApiKeyList), data=json.dumps({'name': 'a test token', 'userId': user_id, 'ttl': -1}), headers=token).status_code == status
@pytest.mark.parametrize("token,status", [
(VALID_USER_HEADER_TOKEN, 403),
(VALID_ADMIN_HEADER_TOKEN, 200),
(VALID_ADMIN_API_TOKEN, 200),
('', 401)
])
def test_api_key_list_post_valid_no_permission(client, token, status):
assert client.post(api.url_for(ApiKeyList), data=json.dumps({'name': 'a test token', 'userId': 2, 'ttl': -1}), headers=token).status_code == status
@pytest.mark.parametrize("token,status", [
(VALID_USER_HEADER_TOKEN, 405),
(VALID_ADMIN_HEADER_TOKEN, 405),
(VALID_ADMIN_API_TOKEN, 405),
('', 405)
])
def test_api_key_list_patch(client, token, status):
assert client.patch(api.url_for(ApiKeyList), data={}, headers=token).status_code == status
@pytest.mark.parametrize("token,status", [
(VALID_USER_HEADER_TOKEN, 405),
(VALID_ADMIN_HEADER_TOKEN, 405),
(VALID_ADMIN_API_TOKEN, 405),
('', 405)
])
def test_api_key_list_delete(client, token, status):
assert client.delete(api.url_for(ApiKeyList), headers=token).status_code == status
@pytest.mark.parametrize("token,status", [
(VALID_USER_HEADER_TOKEN, 200),
(VALID_ADMIN_HEADER_TOKEN, 200),
(VALID_ADMIN_API_TOKEN, 200),
('', 401)
])
def test_user_api_key_list_get(client, token, status):
assert client.get(api.url_for(ApiKeyUserList, user_id=1), headers=token).status_code == status
@pytest.mark.parametrize("token,status", [
(VALID_USER_HEADER_TOKEN, 400),
(VALID_ADMIN_HEADER_TOKEN, 400),
(VALID_ADMIN_API_TOKEN, 400),
('', 401)
])
def test_user_api_key_list_post_invalid(client, token, status):
assert client.post(api.url_for(ApiKeyUserList, user_id=1), data={}, headers=token).status_code == status
@pytest.mark.parametrize("token,user_id,status", [
(VALID_USER_HEADER_TOKEN, 1, 200),
(VALID_ADMIN_HEADER_TOKEN, 2, 200),
(VALID_ADMIN_API_TOKEN, 2, 200),
('', 0, 401)
])
def test_user_api_key_list_post_valid_self(client, user_id, token, status):
assert client.post(api.url_for(ApiKeyUserList, user_id=1), data=json.dumps({'name': 'a test token', 'userId': user_id, 'ttl': -1}), headers=token).status_code == status
@pytest.mark.parametrize("token,status", [
(VALID_USER_HEADER_TOKEN, 403),
(VALID_ADMIN_HEADER_TOKEN, 200),
(VALID_ADMIN_API_TOKEN, 200),
('', 401)
])
def test_user_api_key_list_post_valid_no_permission(client, token, status):
assert client.post(api.url_for(ApiKeyUserList, user_id=2), data=json.dumps({'name': 'a test token', 'userId': 2, 'ttl': -1}), headers=token).status_code == status
@pytest.mark.parametrize("token,status", [
(VALID_USER_HEADER_TOKEN, 405),
(VALID_ADMIN_HEADER_TOKEN, 405),
(VALID_ADMIN_API_TOKEN, 405),
('', 405)
])
def test_user_api_key_list_patch(client, token, status):
assert client.patch(api.url_for(ApiKeyUserList, user_id=1), data={}, headers=token).status_code == status
@pytest.mark.parametrize("token,status", [
(VALID_USER_HEADER_TOKEN, 405),
(VALID_ADMIN_HEADER_TOKEN, 405),
(VALID_ADMIN_API_TOKEN, 405),
('', 405)
])
def test_user_api_key_list_delete(client, token, status):
assert client.delete(api.url_for(ApiKeyUserList, user_id=1), headers=token).status_code == status
@pytest.mark.parametrize("token,status", [
(VALID_USER_HEADER_TOKEN, 403),
(VALID_ADMIN_HEADER_TOKEN, 200),
(VALID_ADMIN_API_TOKEN, 200),
('', 401)
])
@pytest.mark.skip(reason="no way of getting an actual user onto the access key to generate a jwt")
def test_api_key_get(client, token, status):
assert client.get(api.url_for(ApiKeys, aid=1), headers=token).status_code == status
@pytest.mark.parametrize("token,status", [
(VALID_USER_HEADER_TOKEN, 405),
(VALID_ADMIN_HEADER_TOKEN, 405),
(VALID_ADMIN_API_TOKEN, 405),
('', 405)
])
def test_api_key_post(client, token, status):
assert client.post(api.url_for(ApiKeys, aid=1), headers=token).status_code == status
@pytest.mark.parametrize("token,status", [
(VALID_USER_HEADER_TOKEN, 405),
(VALID_ADMIN_HEADER_TOKEN, 405),
(VALID_ADMIN_API_TOKEN, 405),
('', 405)
])
def test_api_key_patch(client, token, status):
assert client.patch(api.url_for(ApiKeys, aid=1), headers=token).status_code == status
@pytest.mark.parametrize("token,status", [
(VALID_USER_HEADER_TOKEN, 403),
(VALID_ADMIN_HEADER_TOKEN, 200),
(VALID_ADMIN_API_TOKEN, 200),
('', 401)
])
@pytest.mark.skip(reason="no way of getting an actual user onto the access key to generate a jwt")
def test_api_key_put_permssions(client, token, status):
assert client.put(api.url_for(ApiKeys, aid=1), data=json.dumps({'name': 'Test', 'revoked': False, 'ttl': -1}), headers=token).status_code == status
# This test works while the other doesn't because the schema allows user id to be null.
@pytest.mark.parametrize("token,status", [
(VALID_USER_HEADER_TOKEN, 403),
(VALID_ADMIN_HEADER_TOKEN, 200),
(VALID_ADMIN_API_TOKEN, 200),
('', 401)
])
def test_api_key_described_get(client, token, status):
assert client.get(api.url_for(ApiKeysDescribed, aid=1), headers=token).status_code == status
@pytest.mark.parametrize("token,status", [
(VALID_USER_HEADER_TOKEN, 200),
(VALID_ADMIN_HEADER_TOKEN, 200),
(VALID_ADMIN_API_TOKEN, 200),
('', 401)
])
@pytest.mark.skip(reason="no way of getting an actual user onto the access key to generate a jwt")
def test_user_api_key_get(client, token, status):
assert client.get(api.url_for(UserApiKeys, uid=1, aid=1), headers=token).status_code == status
@pytest.mark.parametrize("token,status", [
(VALID_USER_HEADER_TOKEN, 405),
(VALID_ADMIN_HEADER_TOKEN, 405),
(VALID_ADMIN_API_TOKEN, 405),
('', 405)
])
def test_user_api_key_post(client, token, status):
assert client.post(api.url_for(UserApiKeys, uid=2, aid=1), data={}, headers=token).status_code == status
@pytest.mark.parametrize("token,status", [
(VALID_USER_HEADER_TOKEN, 405),
(VALID_ADMIN_HEADER_TOKEN, 405),
(VALID_ADMIN_API_TOKEN, 405),
('', 405)
])
def test_user_api_key_patch(client, token, status):
assert client.patch(api.url_for(UserApiKeys, uid=2, aid=1), data={}, headers=token).status_code == status
@pytest.mark.parametrize("token,status", [
(VALID_USER_HEADER_TOKEN, 403),
(VALID_ADMIN_HEADER_TOKEN, 200),
(VALID_ADMIN_API_TOKEN, 200),
('', 401)
])
@pytest.mark.skip(reason="no way of getting an actual user onto the access key to generate a jwt")
def test_user_api_key_put_permssions(client, token, status):
assert client.put(api.url_for(UserApiKeys, uid=2, aid=1), data=json.dumps({'name': 'Test', 'revoked': False, 'ttl': -1}), headers=token).status_code == status

View File

@ -4,7 +4,7 @@ import pytest
from lemur.authorities.views import * # noqa from lemur.authorities.views import * # noqa
from lemur.tests.factories import AuthorityFactory, RoleFactory from lemur.tests.factories import AuthorityFactory, RoleFactory
from lemur.tests.vectors import VALID_ADMIN_HEADER_TOKEN, VALID_USER_HEADER_TOKEN from lemur.tests.vectors import VALID_ADMIN_API_TOKEN, VALID_ADMIN_HEADER_TOKEN, VALID_USER_HEADER_TOKEN
def test_authority_input_schema(client, role, issuer_plugin, logged_in_user): def test_authority_input_schema(client, role, issuer_plugin, logged_in_user):
@ -47,7 +47,8 @@ def test_create_authority(issuer_plugin, user):
@pytest.mark.parametrize("token, count", [ @pytest.mark.parametrize("token, count", [
(VALID_USER_HEADER_TOKEN, 0), (VALID_USER_HEADER_TOKEN, 0),
(VALID_ADMIN_HEADER_TOKEN, 3) (VALID_ADMIN_HEADER_TOKEN, 3),
(VALID_ADMIN_API_TOKEN, 3),
]) ])
def test_admin_authority(client, authority, issuer_plugin, token, count): def test_admin_authority(client, authority, issuer_plugin, token, count):
assert client.get(api.url_for(AuthoritiesList), headers=token).json['total'] == count assert client.get(api.url_for(AuthoritiesList), headers=token).json['total'] == count
@ -56,6 +57,7 @@ def test_admin_authority(client, authority, issuer_plugin, token, count):
@pytest.mark.parametrize("token,status", [ @pytest.mark.parametrize("token,status", [
(VALID_USER_HEADER_TOKEN, 200), (VALID_USER_HEADER_TOKEN, 200),
(VALID_ADMIN_HEADER_TOKEN, 200), (VALID_ADMIN_HEADER_TOKEN, 200),
(VALID_ADMIN_API_TOKEN, 200),
('', 401) ('', 401)
]) ])
def test_authority_get(client, token, status): def test_authority_get(client, token, status):
@ -65,6 +67,7 @@ def test_authority_get(client, token, status):
@pytest.mark.parametrize("token,status", [ @pytest.mark.parametrize("token,status", [
(VALID_USER_HEADER_TOKEN, 405), (VALID_USER_HEADER_TOKEN, 405),
(VALID_ADMIN_HEADER_TOKEN, 405), (VALID_ADMIN_HEADER_TOKEN, 405),
(VALID_ADMIN_API_TOKEN, 405),
('', 405) ('', 405)
]) ])
def test_authority_post(client, token, status): def test_authority_post(client, token, status):
@ -74,6 +77,7 @@ def test_authority_post(client, token, status):
@pytest.mark.parametrize("token,status", [ @pytest.mark.parametrize("token,status", [
(VALID_USER_HEADER_TOKEN, 400), (VALID_USER_HEADER_TOKEN, 400),
(VALID_ADMIN_HEADER_TOKEN, 400), (VALID_ADMIN_HEADER_TOKEN, 400),
(VALID_ADMIN_API_TOKEN, 400),
('', 401) ('', 401)
]) ])
def test_authority_put(client, token, status): def test_authority_put(client, token, status):
@ -83,6 +87,7 @@ def test_authority_put(client, token, status):
@pytest.mark.parametrize("token,status", [ @pytest.mark.parametrize("token,status", [
(VALID_USER_HEADER_TOKEN, 405), (VALID_USER_HEADER_TOKEN, 405),
(VALID_ADMIN_HEADER_TOKEN, 405), (VALID_ADMIN_HEADER_TOKEN, 405),
(VALID_ADMIN_API_TOKEN, 405),
('', 405) ('', 405)
]) ])
def test_authority_delete(client, token, status): def test_authority_delete(client, token, status):
@ -92,6 +97,7 @@ def test_authority_delete(client, token, status):
@pytest.mark.parametrize("token,status", [ @pytest.mark.parametrize("token,status", [
(VALID_USER_HEADER_TOKEN, 405), (VALID_USER_HEADER_TOKEN, 405),
(VALID_ADMIN_HEADER_TOKEN, 405), (VALID_ADMIN_HEADER_TOKEN, 405),
(VALID_ADMIN_API_TOKEN, 405),
('', 405) ('', 405)
]) ])
def test_authority_patch(client, token, status): def test_authority_patch(client, token, status):
@ -101,6 +107,7 @@ def test_authority_patch(client, token, status):
@pytest.mark.parametrize("token,status", [ @pytest.mark.parametrize("token,status", [
(VALID_USER_HEADER_TOKEN, 200), (VALID_USER_HEADER_TOKEN, 200),
(VALID_ADMIN_HEADER_TOKEN, 200), (VALID_ADMIN_HEADER_TOKEN, 200),
(VALID_ADMIN_API_TOKEN, 200),
('', 401) ('', 401)
]) ])
def test_authorities_get(client, token, status): def test_authorities_get(client, token, status):
@ -110,6 +117,7 @@ def test_authorities_get(client, token, status):
@pytest.mark.parametrize("token,status", [ @pytest.mark.parametrize("token,status", [
(VALID_USER_HEADER_TOKEN, 400), (VALID_USER_HEADER_TOKEN, 400),
(VALID_ADMIN_HEADER_TOKEN, 400), (VALID_ADMIN_HEADER_TOKEN, 400),
(VALID_ADMIN_API_TOKEN, 400),
('', 401) ('', 401)
]) ])
def test_authorities_post(client, token, status): def test_authorities_post(client, token, status):
@ -119,6 +127,7 @@ def test_authorities_post(client, token, status):
@pytest.mark.parametrize("token,status", [ @pytest.mark.parametrize("token,status", [
(VALID_USER_HEADER_TOKEN, 405), (VALID_USER_HEADER_TOKEN, 405),
(VALID_ADMIN_HEADER_TOKEN, 405), (VALID_ADMIN_HEADER_TOKEN, 405),
(VALID_ADMIN_API_TOKEN, 405),
('', 405) ('', 405)
]) ])
def test_authorities_put(client, token, status): def test_authorities_put(client, token, status):
@ -128,6 +137,7 @@ def test_authorities_put(client, token, status):
@pytest.mark.parametrize("token,status", [ @pytest.mark.parametrize("token,status", [
(VALID_USER_HEADER_TOKEN, 405), (VALID_USER_HEADER_TOKEN, 405),
(VALID_ADMIN_HEADER_TOKEN, 405), (VALID_ADMIN_HEADER_TOKEN, 405),
(VALID_ADMIN_API_TOKEN, 405),
('', 405) ('', 405)
]) ])
def test_authorities_delete(client, token, status): def test_authorities_delete(client, token, status):
@ -137,6 +147,7 @@ def test_authorities_delete(client, token, status):
@pytest.mark.parametrize("token,status", [ @pytest.mark.parametrize("token,status", [
(VALID_USER_HEADER_TOKEN, 405), (VALID_USER_HEADER_TOKEN, 405),
(VALID_ADMIN_HEADER_TOKEN, 405), (VALID_ADMIN_HEADER_TOKEN, 405),
(VALID_ADMIN_API_TOKEN, 405),
('', 405) ('', 405)
]) ])
def test_authorities_patch(client, token, status): def test_authorities_patch(client, token, status):
@ -146,6 +157,7 @@ def test_authorities_patch(client, token, status):
@pytest.mark.parametrize("token,status", [ @pytest.mark.parametrize("token,status", [
(VALID_USER_HEADER_TOKEN, 200), (VALID_USER_HEADER_TOKEN, 200),
(VALID_ADMIN_HEADER_TOKEN, 200), (VALID_ADMIN_HEADER_TOKEN, 200),
(VALID_ADMIN_API_TOKEN, 200),
('', 401) ('', 401)
]) ])
def test_certificate_authorities_get(client, token, status): def test_certificate_authorities_get(client, token, status):
@ -155,6 +167,7 @@ def test_certificate_authorities_get(client, token, status):
@pytest.mark.parametrize("token,status", [ @pytest.mark.parametrize("token,status", [
(VALID_USER_HEADER_TOKEN, 400), (VALID_USER_HEADER_TOKEN, 400),
(VALID_ADMIN_HEADER_TOKEN, 400), (VALID_ADMIN_HEADER_TOKEN, 400),
(VALID_ADMIN_API_TOKEN, 400),
('', 401) ('', 401)
]) ])
def test_certificate_authorities_post(client, token, status): def test_certificate_authorities_post(client, token, status):
@ -164,6 +177,7 @@ def test_certificate_authorities_post(client, token, status):
@pytest.mark.parametrize("token,status", [ @pytest.mark.parametrize("token,status", [
(VALID_USER_HEADER_TOKEN, 405), (VALID_USER_HEADER_TOKEN, 405),
(VALID_ADMIN_HEADER_TOKEN, 405), (VALID_ADMIN_HEADER_TOKEN, 405),
(VALID_ADMIN_API_TOKEN, 405),
('', 405) ('', 405)
]) ])
def test_certificate_authorities_put(client, token, status): def test_certificate_authorities_put(client, token, status):
@ -173,6 +187,7 @@ def test_certificate_authorities_put(client, token, status):
@pytest.mark.parametrize("token,status", [ @pytest.mark.parametrize("token,status", [
(VALID_USER_HEADER_TOKEN, 405), (VALID_USER_HEADER_TOKEN, 405),
(VALID_ADMIN_HEADER_TOKEN, 405), (VALID_ADMIN_HEADER_TOKEN, 405),
(VALID_ADMIN_API_TOKEN, 405),
('', 405) ('', 405)
]) ])
def test_certificate_authorities_delete(client, token, status): def test_certificate_authorities_delete(client, token, status):
@ -182,6 +197,7 @@ def test_certificate_authorities_delete(client, token, status):
@pytest.mark.parametrize("token,status", [ @pytest.mark.parametrize("token,status", [
(VALID_USER_HEADER_TOKEN, 405), (VALID_USER_HEADER_TOKEN, 405),
(VALID_ADMIN_HEADER_TOKEN, 405), (VALID_ADMIN_HEADER_TOKEN, 405),
(VALID_ADMIN_API_TOKEN, 405),
('', 405) ('', 405)
]) ])
def test_certificate_authorities_patch(client, token, status): def test_certificate_authorities_patch(client, token, status):

View File

@ -15,7 +15,7 @@ from lemur.certificates.views import * # noqa
from lemur.domains.models import Domain from lemur.domains.models import Domain
from lemur.tests.vectors import VALID_ADMIN_HEADER_TOKEN, VALID_USER_HEADER_TOKEN, CSR_STR, \ from lemur.tests.vectors import VALID_ADMIN_API_TOKEN, VALID_ADMIN_HEADER_TOKEN, VALID_USER_HEADER_TOKEN, CSR_STR, \
INTERNAL_VALID_LONG_STR, INTERNAL_VALID_SAN_STR, PRIVATE_KEY_STR INTERNAL_VALID_LONG_STR, INTERNAL_VALID_SAN_STR, PRIVATE_KEY_STR
@ -506,6 +506,7 @@ def test_upload_private_key_str(user):
@pytest.mark.parametrize("token,status", [ @pytest.mark.parametrize("token,status", [
(VALID_USER_HEADER_TOKEN, 200), (VALID_USER_HEADER_TOKEN, 200),
(VALID_ADMIN_HEADER_TOKEN, 200), (VALID_ADMIN_HEADER_TOKEN, 200),
(VALID_ADMIN_API_TOKEN, 200),
('', 401) ('', 401)
]) ])
def test_certificate_get_private_key(client, token, status): def test_certificate_get_private_key(client, token, status):
@ -515,6 +516,7 @@ def test_certificate_get_private_key(client, token, status):
@pytest.mark.parametrize("token,status", [ @pytest.mark.parametrize("token,status", [
(VALID_USER_HEADER_TOKEN, 200), (VALID_USER_HEADER_TOKEN, 200),
(VALID_ADMIN_HEADER_TOKEN, 200), (VALID_ADMIN_HEADER_TOKEN, 200),
(VALID_ADMIN_API_TOKEN, 200),
('', 401) ('', 401)
]) ])
def test_certificate_get(client, token, status): def test_certificate_get(client, token, status):
@ -530,6 +532,7 @@ def test_certificate_get_body(client):
@pytest.mark.parametrize("token,status", [ @pytest.mark.parametrize("token,status", [
(VALID_USER_HEADER_TOKEN, 405), (VALID_USER_HEADER_TOKEN, 405),
(VALID_ADMIN_HEADER_TOKEN, 405), (VALID_ADMIN_HEADER_TOKEN, 405),
(VALID_ADMIN_API_TOKEN, 405),
('', 405) ('', 405)
]) ])
def test_certificate_post(client, token, status): def test_certificate_post(client, token, status):
@ -539,6 +542,7 @@ def test_certificate_post(client, token, status):
@pytest.mark.parametrize("token,status", [ @pytest.mark.parametrize("token,status", [
(VALID_USER_HEADER_TOKEN, 400), (VALID_USER_HEADER_TOKEN, 400),
(VALID_ADMIN_HEADER_TOKEN, 400), (VALID_ADMIN_HEADER_TOKEN, 400),
(VALID_ADMIN_API_TOKEN, 400),
('', 401) ('', 401)
]) ])
def test_certificate_put(client, token, status): def test_certificate_put(client, token, status):
@ -553,6 +557,7 @@ def test_certificate_put_with_data(client, certificate, issuer_plugin):
@pytest.mark.parametrize("token,status", [ @pytest.mark.parametrize("token,status", [
(VALID_USER_HEADER_TOKEN, 405), (VALID_USER_HEADER_TOKEN, 405),
(VALID_ADMIN_HEADER_TOKEN, 405), (VALID_ADMIN_HEADER_TOKEN, 405),
(VALID_ADMIN_API_TOKEN, 405),
('', 405) ('', 405)
]) ])
def test_certificate_delete(client, token, status): def test_certificate_delete(client, token, status):
@ -562,6 +567,7 @@ def test_certificate_delete(client, token, status):
@pytest.mark.parametrize("token,status", [ @pytest.mark.parametrize("token,status", [
(VALID_USER_HEADER_TOKEN, 405), (VALID_USER_HEADER_TOKEN, 405),
(VALID_ADMIN_HEADER_TOKEN, 405), (VALID_ADMIN_HEADER_TOKEN, 405),
(VALID_ADMIN_API_TOKEN, 405),
('', 405) ('', 405)
]) ])
def test_certificate_patch(client, token, status): def test_certificate_patch(client, token, status):
@ -571,6 +577,7 @@ def test_certificate_patch(client, token, status):
@pytest.mark.parametrize("token,status", [ @pytest.mark.parametrize("token,status", [
(VALID_USER_HEADER_TOKEN, 200), (VALID_USER_HEADER_TOKEN, 200),
(VALID_ADMIN_HEADER_TOKEN, 200), (VALID_ADMIN_HEADER_TOKEN, 200),
(VALID_ADMIN_API_TOKEN, 200),
('', 401) ('', 401)
]) ])
def test_certificates_get(client, token, status): def test_certificates_get(client, token, status):
@ -580,6 +587,7 @@ def test_certificates_get(client, token, status):
@pytest.mark.parametrize("token,status", [ @pytest.mark.parametrize("token,status", [
(VALID_USER_HEADER_TOKEN, 400), (VALID_USER_HEADER_TOKEN, 400),
(VALID_ADMIN_HEADER_TOKEN, 400), (VALID_ADMIN_HEADER_TOKEN, 400),
(VALID_ADMIN_API_TOKEN, 400),
('', 401) ('', 401)
]) ])
def test_certificates_post(client, token, status): def test_certificates_post(client, token, status):
@ -589,6 +597,7 @@ def test_certificates_post(client, token, status):
@pytest.mark.parametrize("token,status", [ @pytest.mark.parametrize("token,status", [
(VALID_USER_HEADER_TOKEN, 405), (VALID_USER_HEADER_TOKEN, 405),
(VALID_ADMIN_HEADER_TOKEN, 405), (VALID_ADMIN_HEADER_TOKEN, 405),
(VALID_ADMIN_API_TOKEN, 405),
('', 405) ('', 405)
]) ])
def test_certificates_put(client, token, status): def test_certificates_put(client, token, status):
@ -598,6 +607,7 @@ def test_certificates_put(client, token, status):
@pytest.mark.parametrize("token,status", [ @pytest.mark.parametrize("token,status", [
(VALID_USER_HEADER_TOKEN, 405), (VALID_USER_HEADER_TOKEN, 405),
(VALID_ADMIN_HEADER_TOKEN, 405), (VALID_ADMIN_HEADER_TOKEN, 405),
(VALID_ADMIN_API_TOKEN, 405),
('', 405) ('', 405)
]) ])
def test_certificates_delete(client, token, status): def test_certificates_delete(client, token, status):
@ -607,6 +617,7 @@ def test_certificates_delete(client, token, status):
@pytest.mark.parametrize("token,status", [ @pytest.mark.parametrize("token,status", [
(VALID_USER_HEADER_TOKEN, 405), (VALID_USER_HEADER_TOKEN, 405),
(VALID_ADMIN_HEADER_TOKEN, 405), (VALID_ADMIN_HEADER_TOKEN, 405),
(VALID_ADMIN_API_TOKEN, 405),
('', 405) ('', 405)
]) ])
def test_certificates_patch(client, token, status): def test_certificates_patch(client, token, status):
@ -616,6 +627,7 @@ def test_certificates_patch(client, token, status):
@pytest.mark.parametrize("token,status", [ @pytest.mark.parametrize("token,status", [
(VALID_USER_HEADER_TOKEN, 405), (VALID_USER_HEADER_TOKEN, 405),
(VALID_ADMIN_HEADER_TOKEN, 405), (VALID_ADMIN_HEADER_TOKEN, 405),
(VALID_ADMIN_API_TOKEN, 405),
('', 405) ('', 405)
]) ])
def test_certificate_credentials_post(client, token, status): def test_certificate_credentials_post(client, token, status):
@ -625,6 +637,7 @@ def test_certificate_credentials_post(client, token, status):
@pytest.mark.parametrize("token,status", [ @pytest.mark.parametrize("token,status", [
(VALID_USER_HEADER_TOKEN, 405), (VALID_USER_HEADER_TOKEN, 405),
(VALID_ADMIN_HEADER_TOKEN, 405), (VALID_ADMIN_HEADER_TOKEN, 405),
(VALID_ADMIN_API_TOKEN, 405),
('', 405) ('', 405)
]) ])
def test_certificate_credentials_put(client, token, status): def test_certificate_credentials_put(client, token, status):
@ -634,6 +647,7 @@ def test_certificate_credentials_put(client, token, status):
@pytest.mark.parametrize("token,status", [ @pytest.mark.parametrize("token,status", [
(VALID_USER_HEADER_TOKEN, 405), (VALID_USER_HEADER_TOKEN, 405),
(VALID_ADMIN_HEADER_TOKEN, 405), (VALID_ADMIN_HEADER_TOKEN, 405),
(VALID_ADMIN_API_TOKEN, 405),
('', 405) ('', 405)
]) ])
def test_certificate_credentials_delete(client, token, status): def test_certificate_credentials_delete(client, token, status):
@ -643,6 +657,7 @@ def test_certificate_credentials_delete(client, token, status):
@pytest.mark.parametrize("token,status", [ @pytest.mark.parametrize("token,status", [
(VALID_USER_HEADER_TOKEN, 405), (VALID_USER_HEADER_TOKEN, 405),
(VALID_ADMIN_HEADER_TOKEN, 405), (VALID_ADMIN_HEADER_TOKEN, 405),
(VALID_ADMIN_API_TOKEN, 405),
('', 405) ('', 405)
]) ])
def test_certificate_credentials_patch(client, token, status): def test_certificate_credentials_patch(client, token, status):
@ -652,6 +667,7 @@ def test_certificate_credentials_patch(client, token, status):
@pytest.mark.parametrize("token,status", [ @pytest.mark.parametrize("token,status", [
(VALID_USER_HEADER_TOKEN, 405), (VALID_USER_HEADER_TOKEN, 405),
(VALID_ADMIN_HEADER_TOKEN, 405), (VALID_ADMIN_HEADER_TOKEN, 405),
(VALID_ADMIN_API_TOKEN, 405),
('', 405) ('', 405)
]) ])
def test_certificates_upload_get(client, token, status): def test_certificates_upload_get(client, token, status):
@ -661,6 +677,7 @@ def test_certificates_upload_get(client, token, status):
@pytest.mark.parametrize("token,status", [ @pytest.mark.parametrize("token,status", [
(VALID_USER_HEADER_TOKEN, 400), (VALID_USER_HEADER_TOKEN, 400),
(VALID_ADMIN_HEADER_TOKEN, 400), (VALID_ADMIN_HEADER_TOKEN, 400),
(VALID_ADMIN_API_TOKEN, 400),
('', 401) ('', 401)
]) ])
def test_certificates_upload_post(client, token, status): def test_certificates_upload_post(client, token, status):
@ -670,6 +687,7 @@ def test_certificates_upload_post(client, token, status):
@pytest.mark.parametrize("token,status", [ @pytest.mark.parametrize("token,status", [
(VALID_USER_HEADER_TOKEN, 405), (VALID_USER_HEADER_TOKEN, 405),
(VALID_ADMIN_HEADER_TOKEN, 405), (VALID_ADMIN_HEADER_TOKEN, 405),
(VALID_ADMIN_API_TOKEN, 405),
('', 405) ('', 405)
]) ])
def test_certificates_upload_put(client, token, status): def test_certificates_upload_put(client, token, status):
@ -679,6 +697,7 @@ def test_certificates_upload_put(client, token, status):
@pytest.mark.parametrize("token,status", [ @pytest.mark.parametrize("token,status", [
(VALID_USER_HEADER_TOKEN, 405), (VALID_USER_HEADER_TOKEN, 405),
(VALID_ADMIN_HEADER_TOKEN, 405), (VALID_ADMIN_HEADER_TOKEN, 405),
(VALID_ADMIN_API_TOKEN, 405),
('', 405) ('', 405)
]) ])
def test_certificates_upload_delete(client, token, status): def test_certificates_upload_delete(client, token, status):
@ -688,6 +707,7 @@ def test_certificates_upload_delete(client, token, status):
@pytest.mark.parametrize("token,status", [ @pytest.mark.parametrize("token,status", [
(VALID_USER_HEADER_TOKEN, 405), (VALID_USER_HEADER_TOKEN, 405),
(VALID_ADMIN_HEADER_TOKEN, 405), (VALID_ADMIN_HEADER_TOKEN, 405),
(VALID_ADMIN_API_TOKEN, 405),
('', 405) ('', 405)
]) ])
def test_certificates_upload_patch(client, token, status): def test_certificates_upload_patch(client, token, status):

View File

@ -3,7 +3,7 @@ import pytest
from lemur.destinations.views import * # noqa from lemur.destinations.views import * # noqa
from .vectors import VALID_ADMIN_HEADER_TOKEN, VALID_USER_HEADER_TOKEN from .vectors import VALID_ADMIN_API_TOKEN, VALID_ADMIN_HEADER_TOKEN, VALID_USER_HEADER_TOKEN
def test_destination_input_schema(client, destination_plugin, destination): def test_destination_input_schema(client, destination_plugin, destination):
@ -27,6 +27,7 @@ def test_destination_input_schema(client, destination_plugin, destination):
@pytest.mark.parametrize("token,status", [ @pytest.mark.parametrize("token,status", [
(VALID_USER_HEADER_TOKEN, 404), (VALID_USER_HEADER_TOKEN, 404),
(VALID_ADMIN_HEADER_TOKEN, 404), (VALID_ADMIN_HEADER_TOKEN, 404),
(VALID_ADMIN_API_TOKEN, 404),
('', 401) ('', 401)
]) ])
def test_destination_get(client, token, status): def test_destination_get(client, token, status):
@ -36,6 +37,7 @@ def test_destination_get(client, token, status):
@pytest.mark.parametrize("token,status", [ @pytest.mark.parametrize("token,status", [
(VALID_USER_HEADER_TOKEN, 405), (VALID_USER_HEADER_TOKEN, 405),
(VALID_ADMIN_HEADER_TOKEN, 405), (VALID_ADMIN_HEADER_TOKEN, 405),
(VALID_ADMIN_API_TOKEN, 405),
('', 405) ('', 405)
]) ])
def test_destination_post_(client, token, status): def test_destination_post_(client, token, status):
@ -45,6 +47,7 @@ def test_destination_post_(client, token, status):
@pytest.mark.parametrize("token,status", [ @pytest.mark.parametrize("token,status", [
(VALID_USER_HEADER_TOKEN, 403), (VALID_USER_HEADER_TOKEN, 403),
(VALID_ADMIN_HEADER_TOKEN, 400), (VALID_ADMIN_HEADER_TOKEN, 400),
(VALID_ADMIN_API_TOKEN, 400),
('', 401) ('', 401)
]) ])
def test_destination_put(client, token, status): def test_destination_put(client, token, status):
@ -54,6 +57,7 @@ def test_destination_put(client, token, status):
@pytest.mark.parametrize("token,status", [ @pytest.mark.parametrize("token,status", [
(VALID_USER_HEADER_TOKEN, 403), (VALID_USER_HEADER_TOKEN, 403),
(VALID_ADMIN_HEADER_TOKEN, 200), (VALID_ADMIN_HEADER_TOKEN, 200),
(VALID_ADMIN_API_TOKEN, 200),
('', 401) ('', 401)
]) ])
def test_destination_delete(client, token, status): def test_destination_delete(client, token, status):
@ -63,6 +67,7 @@ def test_destination_delete(client, token, status):
@pytest.mark.parametrize("token,status", [ @pytest.mark.parametrize("token,status", [
(VALID_USER_HEADER_TOKEN, 405), (VALID_USER_HEADER_TOKEN, 405),
(VALID_ADMIN_HEADER_TOKEN, 405), (VALID_ADMIN_HEADER_TOKEN, 405),
(VALID_ADMIN_API_TOKEN, 405),
('', 405) ('', 405)
]) ])
def test_destination_patch(client, token, status): def test_destination_patch(client, token, status):
@ -72,6 +77,7 @@ def test_destination_patch(client, token, status):
@pytest.mark.parametrize("token,status", [ @pytest.mark.parametrize("token,status", [
(VALID_USER_HEADER_TOKEN, 403), (VALID_USER_HEADER_TOKEN, 403),
(VALID_ADMIN_HEADER_TOKEN, 400), (VALID_ADMIN_HEADER_TOKEN, 400),
(VALID_ADMIN_API_TOKEN, 400),
('', 401) ('', 401)
]) ])
def test_destination_list_post_(client, token, status): def test_destination_list_post_(client, token, status):
@ -81,6 +87,7 @@ def test_destination_list_post_(client, token, status):
@pytest.mark.parametrize("token,status", [ @pytest.mark.parametrize("token,status", [
(VALID_USER_HEADER_TOKEN, 200), (VALID_USER_HEADER_TOKEN, 200),
(VALID_ADMIN_HEADER_TOKEN, 200), (VALID_ADMIN_HEADER_TOKEN, 200),
(VALID_ADMIN_API_TOKEN, 200),
('', 401) ('', 401)
]) ])
def test_destination_list_get(client, token, status): def test_destination_list_get(client, token, status):
@ -90,6 +97,7 @@ def test_destination_list_get(client, token, status):
@pytest.mark.parametrize("token,status", [ @pytest.mark.parametrize("token,status", [
(VALID_USER_HEADER_TOKEN, 405), (VALID_USER_HEADER_TOKEN, 405),
(VALID_ADMIN_HEADER_TOKEN, 405), (VALID_ADMIN_HEADER_TOKEN, 405),
(VALID_ADMIN_API_TOKEN, 405),
('', 405) ('', 405)
]) ])
def test_destination_list_delete(client, token, status): def test_destination_list_delete(client, token, status):
@ -99,6 +107,7 @@ def test_destination_list_delete(client, token, status):
@pytest.mark.parametrize("token,status", [ @pytest.mark.parametrize("token,status", [
(VALID_USER_HEADER_TOKEN, 405), (VALID_USER_HEADER_TOKEN, 405),
(VALID_ADMIN_HEADER_TOKEN, 405), (VALID_ADMIN_HEADER_TOKEN, 405),
(VALID_ADMIN_API_TOKEN, 405),
('', 405) ('', 405)
]) ])
def test_destination_list_patch(client, token, status): def test_destination_list_patch(client, token, status):

View File

@ -3,12 +3,13 @@ import pytest
from lemur.domains.views import * # noqa from lemur.domains.views import * # noqa
from .vectors import VALID_ADMIN_HEADER_TOKEN, VALID_USER_HEADER_TOKEN from .vectors import VALID_ADMIN_API_TOKEN, VALID_ADMIN_HEADER_TOKEN, VALID_USER_HEADER_TOKEN
@pytest.mark.parametrize("token,status", [ @pytest.mark.parametrize("token,status", [
(VALID_USER_HEADER_TOKEN, 200), (VALID_USER_HEADER_TOKEN, 200),
(VALID_ADMIN_HEADER_TOKEN, 200), (VALID_ADMIN_HEADER_TOKEN, 200),
(VALID_ADMIN_API_TOKEN, 200),
('', 401) ('', 401)
]) ])
def test_domain_get(client, token, status): def test_domain_get(client, token, status):
@ -18,6 +19,7 @@ def test_domain_get(client, token, status):
@pytest.mark.parametrize("token,status", [ @pytest.mark.parametrize("token,status", [
(VALID_USER_HEADER_TOKEN, 405), (VALID_USER_HEADER_TOKEN, 405),
(VALID_ADMIN_HEADER_TOKEN, 405), (VALID_ADMIN_HEADER_TOKEN, 405),
(VALID_ADMIN_API_TOKEN, 405),
('', 405) ('', 405)
]) ])
def test_domain_post_(client, token, status): def test_domain_post_(client, token, status):
@ -27,6 +29,7 @@ def test_domain_post_(client, token, status):
@pytest.mark.parametrize("token,status", [ @pytest.mark.parametrize("token,status", [
(VALID_USER_HEADER_TOKEN, 400), (VALID_USER_HEADER_TOKEN, 400),
(VALID_ADMIN_HEADER_TOKEN, 400), (VALID_ADMIN_HEADER_TOKEN, 400),
(VALID_ADMIN_API_TOKEN, 400),
('', 401) ('', 401)
]) ])
def test_domain_put(client, token, status): def test_domain_put(client, token, status):
@ -36,6 +39,7 @@ def test_domain_put(client, token, status):
@pytest.mark.parametrize("token,status", [ @pytest.mark.parametrize("token,status", [
(VALID_USER_HEADER_TOKEN, 405), (VALID_USER_HEADER_TOKEN, 405),
(VALID_ADMIN_HEADER_TOKEN, 405), (VALID_ADMIN_HEADER_TOKEN, 405),
(VALID_ADMIN_API_TOKEN, 405),
('', 405) ('', 405)
]) ])
def test_domain_delete(client, token, status): def test_domain_delete(client, token, status):
@ -45,6 +49,7 @@ def test_domain_delete(client, token, status):
@pytest.mark.parametrize("token,status", [ @pytest.mark.parametrize("token,status", [
(VALID_USER_HEADER_TOKEN, 405), (VALID_USER_HEADER_TOKEN, 405),
(VALID_ADMIN_HEADER_TOKEN, 405), (VALID_ADMIN_HEADER_TOKEN, 405),
(VALID_ADMIN_API_TOKEN, 405),
('', 405) ('', 405)
]) ])
def test_domain_patch(client, token, status): def test_domain_patch(client, token, status):
@ -54,6 +59,7 @@ def test_domain_patch(client, token, status):
@pytest.mark.parametrize("token,status", [ @pytest.mark.parametrize("token,status", [
(VALID_USER_HEADER_TOKEN, 400), (VALID_USER_HEADER_TOKEN, 400),
(VALID_ADMIN_HEADER_TOKEN, 400), (VALID_ADMIN_HEADER_TOKEN, 400),
(VALID_ADMIN_API_TOKEN, 400),
('', 401) ('', 401)
]) ])
def test_domain_list_post_(client, token, status): def test_domain_list_post_(client, token, status):
@ -63,6 +69,7 @@ def test_domain_list_post_(client, token, status):
@pytest.mark.parametrize("token,status", [ @pytest.mark.parametrize("token,status", [
(VALID_USER_HEADER_TOKEN, 200), (VALID_USER_HEADER_TOKEN, 200),
(VALID_ADMIN_HEADER_TOKEN, 200), (VALID_ADMIN_HEADER_TOKEN, 200),
(VALID_ADMIN_API_TOKEN, 200),
('', 401) ('', 401)
]) ])
def test_domain_list_get(client, token, status): def test_domain_list_get(client, token, status):
@ -72,6 +79,7 @@ def test_domain_list_get(client, token, status):
@pytest.mark.parametrize("token,status", [ @pytest.mark.parametrize("token,status", [
(VALID_USER_HEADER_TOKEN, 405), (VALID_USER_HEADER_TOKEN, 405),
(VALID_ADMIN_HEADER_TOKEN, 405), (VALID_ADMIN_HEADER_TOKEN, 405),
(VALID_ADMIN_API_TOKEN, 405),
('', 405) ('', 405)
]) ])
def test_domain_list_delete(client, token, status): def test_domain_list_delete(client, token, status):
@ -81,6 +89,7 @@ def test_domain_list_delete(client, token, status):
@pytest.mark.parametrize("token,status", [ @pytest.mark.parametrize("token,status", [
(VALID_USER_HEADER_TOKEN, 405), (VALID_USER_HEADER_TOKEN, 405),
(VALID_ADMIN_HEADER_TOKEN, 405), (VALID_ADMIN_HEADER_TOKEN, 405),
(VALID_ADMIN_API_TOKEN, 405),
('', 405) ('', 405)
]) ])
def test_domain_list_patch(client, token, status): def test_domain_list_patch(client, token, status):

View File

@ -4,7 +4,7 @@ from lemur.endpoints.views import * # noqa
from lemur.tests.factories import EndpointFactory, CertificateFactory from lemur.tests.factories import EndpointFactory, CertificateFactory
from .vectors import VALID_ADMIN_HEADER_TOKEN, VALID_USER_HEADER_TOKEN from .vectors import VALID_ADMIN_API_TOKEN, VALID_ADMIN_HEADER_TOKEN, VALID_USER_HEADER_TOKEN
def test_rotate_certificate(client, source_plugin): def test_rotate_certificate(client, source_plugin):
@ -19,6 +19,7 @@ def test_rotate_certificate(client, source_plugin):
@pytest.mark.parametrize("token,status", [ @pytest.mark.parametrize("token,status", [
(VALID_USER_HEADER_TOKEN, 404), (VALID_USER_HEADER_TOKEN, 404),
(VALID_ADMIN_HEADER_TOKEN, 404), (VALID_ADMIN_HEADER_TOKEN, 404),
(VALID_ADMIN_API_TOKEN, 404),
('', 401) ('', 401)
]) ])
def test_endpoint_get(client, token, status): def test_endpoint_get(client, token, status):
@ -28,6 +29,7 @@ def test_endpoint_get(client, token, status):
@pytest.mark.parametrize("token,status", [ @pytest.mark.parametrize("token,status", [
(VALID_USER_HEADER_TOKEN, 405), (VALID_USER_HEADER_TOKEN, 405),
(VALID_ADMIN_HEADER_TOKEN, 405), (VALID_ADMIN_HEADER_TOKEN, 405),
(VALID_ADMIN_API_TOKEN, 405),
('', 405) ('', 405)
]) ])
def test_endpoint_post_(client, token, status): def test_endpoint_post_(client, token, status):
@ -37,6 +39,7 @@ def test_endpoint_post_(client, token, status):
@pytest.mark.parametrize("token,status", [ @pytest.mark.parametrize("token,status", [
(VALID_USER_HEADER_TOKEN, 405), (VALID_USER_HEADER_TOKEN, 405),
(VALID_ADMIN_HEADER_TOKEN, 405), (VALID_ADMIN_HEADER_TOKEN, 405),
(VALID_ADMIN_API_TOKEN, 405),
('', 405) ('', 405)
]) ])
def test_endpoint_put(client, token, status): def test_endpoint_put(client, token, status):
@ -46,6 +49,7 @@ def test_endpoint_put(client, token, status):
@pytest.mark.parametrize("token,status", [ @pytest.mark.parametrize("token,status", [
(VALID_USER_HEADER_TOKEN, 405), (VALID_USER_HEADER_TOKEN, 405),
(VALID_ADMIN_HEADER_TOKEN, 405), (VALID_ADMIN_HEADER_TOKEN, 405),
(VALID_ADMIN_API_TOKEN, 405),
('', 405) ('', 405)
]) ])
def test_endpoint_delete(client, token, status): def test_endpoint_delete(client, token, status):
@ -55,6 +59,7 @@ def test_endpoint_delete(client, token, status):
@pytest.mark.parametrize("token,status", [ @pytest.mark.parametrize("token,status", [
(VALID_USER_HEADER_TOKEN, 405), (VALID_USER_HEADER_TOKEN, 405),
(VALID_ADMIN_HEADER_TOKEN, 405), (VALID_ADMIN_HEADER_TOKEN, 405),
(VALID_ADMIN_API_TOKEN, 405),
('', 405) ('', 405)
]) ])
def test_endpoint_patch(client, token, status): def test_endpoint_patch(client, token, status):
@ -64,6 +69,7 @@ def test_endpoint_patch(client, token, status):
@pytest.mark.parametrize("token,status", [ @pytest.mark.parametrize("token,status", [
(VALID_USER_HEADER_TOKEN, 405), (VALID_USER_HEADER_TOKEN, 405),
(VALID_ADMIN_HEADER_TOKEN, 405), (VALID_ADMIN_HEADER_TOKEN, 405),
(VALID_ADMIN_API_TOKEN, 405),
('', 405) ('', 405)
]) ])
def test_endpoint_list_post_(client, token, status): def test_endpoint_list_post_(client, token, status):
@ -73,6 +79,7 @@ def test_endpoint_list_post_(client, token, status):
@pytest.mark.parametrize("token,status", [ @pytest.mark.parametrize("token,status", [
(VALID_USER_HEADER_TOKEN, 200), (VALID_USER_HEADER_TOKEN, 200),
(VALID_ADMIN_HEADER_TOKEN, 200), (VALID_ADMIN_HEADER_TOKEN, 200),
(VALID_ADMIN_API_TOKEN, 200),
('', 401) ('', 401)
]) ])
def test_endpoint_list_get(client, token, status): def test_endpoint_list_get(client, token, status):
@ -82,6 +89,7 @@ def test_endpoint_list_get(client, token, status):
@pytest.mark.parametrize("token,status", [ @pytest.mark.parametrize("token,status", [
(VALID_USER_HEADER_TOKEN, 405), (VALID_USER_HEADER_TOKEN, 405),
(VALID_ADMIN_HEADER_TOKEN, 405), (VALID_ADMIN_HEADER_TOKEN, 405),
(VALID_ADMIN_API_TOKEN, 405),
('', 405) ('', 405)
]) ])
def test_endpoint_list_delete(client, token, status): def test_endpoint_list_delete(client, token, status):
@ -91,6 +99,7 @@ def test_endpoint_list_delete(client, token, status):
@pytest.mark.parametrize("token,status", [ @pytest.mark.parametrize("token,status", [
(VALID_USER_HEADER_TOKEN, 405), (VALID_USER_HEADER_TOKEN, 405),
(VALID_ADMIN_HEADER_TOKEN, 405), (VALID_ADMIN_HEADER_TOKEN, 405),
(VALID_ADMIN_API_TOKEN, 405),
('', 405) ('', 405)
]) ])
def test_endpoint_list_patch(client, token, status): def test_endpoint_list_patch(client, token, status):

View File

@ -1,5 +1,5 @@
import pytest import pytest
from lemur.tests.vectors import VALID_ADMIN_HEADER_TOKEN, VALID_USER_HEADER_TOKEN from lemur.tests.vectors import VALID_ADMIN_API_TOKEN, VALID_ADMIN_HEADER_TOKEN, VALID_USER_HEADER_TOKEN
from lemur.logs.views import * # noqa from lemur.logs.views import * # noqa
@ -14,6 +14,7 @@ def test_private_key_audit(client, certificate):
@pytest.mark.parametrize("token,status", [ @pytest.mark.parametrize("token,status", [
(VALID_USER_HEADER_TOKEN, 200), (VALID_USER_HEADER_TOKEN, 200),
(VALID_ADMIN_HEADER_TOKEN, 200), (VALID_ADMIN_HEADER_TOKEN, 200),
(VALID_ADMIN_API_TOKEN, 200),
('', 401) ('', 401)
]) ])
def test_get_logs(client, token, status): def test_get_logs(client, token, status):

View File

@ -3,7 +3,7 @@ import pytest
from lemur.notifications.views import * # noqa from lemur.notifications.views import * # noqa
from .vectors import VALID_ADMIN_HEADER_TOKEN, VALID_USER_HEADER_TOKEN from .vectors import VALID_ADMIN_API_TOKEN, VALID_ADMIN_HEADER_TOKEN, VALID_USER_HEADER_TOKEN
def test_notification_input_schema(client, notification_plugin, notification): def test_notification_input_schema(client, notification_plugin, notification):
@ -27,6 +27,7 @@ def test_notification_input_schema(client, notification_plugin, notification):
@pytest.mark.parametrize("token,status", [ @pytest.mark.parametrize("token,status", [
(VALID_USER_HEADER_TOKEN, 200), (VALID_USER_HEADER_TOKEN, 200),
(VALID_ADMIN_HEADER_TOKEN, 200), (VALID_ADMIN_HEADER_TOKEN, 200),
(VALID_ADMIN_API_TOKEN, 200),
('', 401) ('', 401)
]) ])
def test_notification_get(client, notification_plugin, notification, token, status): def test_notification_get(client, notification_plugin, notification, token, status):
@ -36,6 +37,7 @@ def test_notification_get(client, notification_plugin, notification, token, stat
@pytest.mark.parametrize("token,status", [ @pytest.mark.parametrize("token,status", [
(VALID_USER_HEADER_TOKEN, 405), (VALID_USER_HEADER_TOKEN, 405),
(VALID_ADMIN_HEADER_TOKEN, 405), (VALID_ADMIN_HEADER_TOKEN, 405),
(VALID_ADMIN_API_TOKEN, 405),
('', 405) ('', 405)
]) ])
def test_notification_post_(client, token, status): def test_notification_post_(client, token, status):
@ -45,6 +47,7 @@ def test_notification_post_(client, token, status):
@pytest.mark.parametrize("token,status", [ @pytest.mark.parametrize("token,status", [
(VALID_USER_HEADER_TOKEN, 400), (VALID_USER_HEADER_TOKEN, 400),
(VALID_ADMIN_HEADER_TOKEN, 400), (VALID_ADMIN_HEADER_TOKEN, 400),
(VALID_ADMIN_API_TOKEN, 400),
('', 401) ('', 401)
]) ])
def test_notification_put(client, token, status): def test_notification_put(client, token, status):
@ -54,6 +57,7 @@ def test_notification_put(client, token, status):
@pytest.mark.parametrize("token,status", [ @pytest.mark.parametrize("token,status", [
(VALID_USER_HEADER_TOKEN, 200), (VALID_USER_HEADER_TOKEN, 200),
(VALID_ADMIN_HEADER_TOKEN, 200), (VALID_ADMIN_HEADER_TOKEN, 200),
(VALID_ADMIN_API_TOKEN, 200),
('', 401) ('', 401)
]) ])
def test_notification_delete(client, token, status): def test_notification_delete(client, token, status):
@ -63,6 +67,7 @@ def test_notification_delete(client, token, status):
@pytest.mark.parametrize("token,status", [ @pytest.mark.parametrize("token,status", [
(VALID_USER_HEADER_TOKEN, 405), (VALID_USER_HEADER_TOKEN, 405),
(VALID_ADMIN_HEADER_TOKEN, 405), (VALID_ADMIN_HEADER_TOKEN, 405),
(VALID_ADMIN_API_TOKEN, 405),
('', 405) ('', 405)
]) ])
def test_notification_patch(client, token, status): def test_notification_patch(client, token, status):
@ -72,6 +77,7 @@ def test_notification_patch(client, token, status):
@pytest.mark.parametrize("token,status", [ @pytest.mark.parametrize("token,status", [
(VALID_USER_HEADER_TOKEN, 400), (VALID_USER_HEADER_TOKEN, 400),
(VALID_ADMIN_HEADER_TOKEN, 400), (VALID_ADMIN_HEADER_TOKEN, 400),
(VALID_ADMIN_API_TOKEN, 400),
('', 401) ('', 401)
]) ])
def test_notification_list_post_(client, token, status): def test_notification_list_post_(client, token, status):
@ -81,6 +87,7 @@ def test_notification_list_post_(client, token, status):
@pytest.mark.parametrize("token,status", [ @pytest.mark.parametrize("token,status", [
(VALID_USER_HEADER_TOKEN, 200), (VALID_USER_HEADER_TOKEN, 200),
(VALID_ADMIN_HEADER_TOKEN, 200), (VALID_ADMIN_HEADER_TOKEN, 200),
(VALID_ADMIN_API_TOKEN, 200),
('', 401) ('', 401)
]) ])
def test_notification_list_get(client, notification_plugin, notification, token, status): def test_notification_list_get(client, notification_plugin, notification, token, status):
@ -90,6 +97,7 @@ def test_notification_list_get(client, notification_plugin, notification, token,
@pytest.mark.parametrize("token,status", [ @pytest.mark.parametrize("token,status", [
(VALID_USER_HEADER_TOKEN, 405), (VALID_USER_HEADER_TOKEN, 405),
(VALID_ADMIN_HEADER_TOKEN, 405), (VALID_ADMIN_HEADER_TOKEN, 405),
(VALID_ADMIN_API_TOKEN, 405),
('', 405) ('', 405)
]) ])
def test_notification_list_delete(client, token, status): def test_notification_list_delete(client, token, status):
@ -99,6 +107,7 @@ def test_notification_list_delete(client, token, status):
@pytest.mark.parametrize("token,status", [ @pytest.mark.parametrize("token,status", [
(VALID_USER_HEADER_TOKEN, 405), (VALID_USER_HEADER_TOKEN, 405),
(VALID_ADMIN_HEADER_TOKEN, 405), (VALID_ADMIN_HEADER_TOKEN, 405),
(VALID_ADMIN_API_TOKEN, 405),
('', 405) ('', 405)
]) ])
def test_notification_list_patch(client, token, status): def test_notification_list_patch(client, token, status):

View File

@ -4,7 +4,7 @@ import pytest
from lemur.roles.views import * # noqa from lemur.roles.views import * # noqa
from lemur.tests.factories import RoleFactory, AuthorityFactory, CertificateFactory, UserFactory from lemur.tests.factories import RoleFactory, AuthorityFactory, CertificateFactory, UserFactory
from .vectors import VALID_ADMIN_HEADER_TOKEN, VALID_USER_HEADER_TOKEN from .vectors import VALID_ADMIN_API_TOKEN, VALID_ADMIN_HEADER_TOKEN, VALID_USER_HEADER_TOKEN
def test_role_input_schema(client): def test_role_input_schema(client):
@ -41,6 +41,7 @@ def test_multiple_authority_certificate_association(session, client):
@pytest.mark.parametrize("token,status", [ @pytest.mark.parametrize("token,status", [
(VALID_USER_HEADER_TOKEN, 403), (VALID_USER_HEADER_TOKEN, 403),
(VALID_ADMIN_HEADER_TOKEN, 200), (VALID_ADMIN_HEADER_TOKEN, 200),
(VALID_ADMIN_API_TOKEN, 200),
('', 401) ('', 401)
]) ])
def test_role_get(client, token, status): def test_role_get(client, token, status):
@ -50,6 +51,7 @@ def test_role_get(client, token, status):
@pytest.mark.parametrize("token,status", [ @pytest.mark.parametrize("token,status", [
(VALID_USER_HEADER_TOKEN, 405), (VALID_USER_HEADER_TOKEN, 405),
(VALID_ADMIN_HEADER_TOKEN, 405), (VALID_ADMIN_HEADER_TOKEN, 405),
(VALID_ADMIN_API_TOKEN, 405),
('', 405) ('', 405)
]) ])
def test_role_post_(client, token, status): def test_role_post_(client, token, status):
@ -59,6 +61,7 @@ def test_role_post_(client, token, status):
@pytest.mark.parametrize("token,status", [ @pytest.mark.parametrize("token,status", [
(VALID_USER_HEADER_TOKEN, 400), (VALID_USER_HEADER_TOKEN, 400),
(VALID_ADMIN_HEADER_TOKEN, 400), (VALID_ADMIN_HEADER_TOKEN, 400),
(VALID_ADMIN_API_TOKEN, 400),
('', 401) ('', 401)
]) ])
def test_role_put(client, token, status): def test_role_put(client, token, status):
@ -68,6 +71,7 @@ def test_role_put(client, token, status):
@pytest.mark.parametrize("token,status", [ @pytest.mark.parametrize("token,status", [
(VALID_USER_HEADER_TOKEN, 403), (VALID_USER_HEADER_TOKEN, 403),
(VALID_ADMIN_HEADER_TOKEN, 200), (VALID_ADMIN_HEADER_TOKEN, 200),
(VALID_ADMIN_API_TOKEN, 200),
('', 401) ('', 401)
]) ])
def test_role_put_with_data(client, session, token, status): def test_role_put_with_data(client, session, token, status):
@ -115,6 +119,7 @@ def test_role_put_with_data_and_user(client, session):
@pytest.mark.parametrize("token,status", [ @pytest.mark.parametrize("token,status", [
(VALID_USER_HEADER_TOKEN, 403), (VALID_USER_HEADER_TOKEN, 403),
(VALID_ADMIN_HEADER_TOKEN, 200), (VALID_ADMIN_HEADER_TOKEN, 200),
(VALID_ADMIN_API_TOKEN, 200),
('', 401) ('', 401)
]) ])
def test_role_delete(client, token, status, role): def test_role_delete(client, token, status, role):
@ -124,6 +129,7 @@ def test_role_delete(client, token, status, role):
@pytest.mark.parametrize("token,status", [ @pytest.mark.parametrize("token,status", [
(VALID_USER_HEADER_TOKEN, 405), (VALID_USER_HEADER_TOKEN, 405),
(VALID_ADMIN_HEADER_TOKEN, 405), (VALID_ADMIN_HEADER_TOKEN, 405),
(VALID_ADMIN_API_TOKEN, 405),
('', 405) ('', 405)
]) ])
def test_role_patch(client, token, status): def test_role_patch(client, token, status):
@ -133,6 +139,7 @@ def test_role_patch(client, token, status):
@pytest.mark.parametrize("token,status", [ @pytest.mark.parametrize("token,status", [
(VALID_USER_HEADER_TOKEN, 403), (VALID_USER_HEADER_TOKEN, 403),
(VALID_ADMIN_HEADER_TOKEN, 400), (VALID_ADMIN_HEADER_TOKEN, 400),
(VALID_ADMIN_API_TOKEN, 400),
('', 401) ('', 401)
]) ])
def test_role_list_post_(client, token, status): def test_role_list_post_(client, token, status):
@ -142,6 +149,7 @@ def test_role_list_post_(client, token, status):
@pytest.mark.parametrize("token,status", [ @pytest.mark.parametrize("token,status", [
(VALID_USER_HEADER_TOKEN, 200), (VALID_USER_HEADER_TOKEN, 200),
(VALID_ADMIN_HEADER_TOKEN, 200), (VALID_ADMIN_HEADER_TOKEN, 200),
(VALID_ADMIN_API_TOKEN, 200),
('', 401) ('', 401)
]) ])
def test_role_list_get(client, token, status): def test_role_list_get(client, token, status):
@ -151,6 +159,7 @@ def test_role_list_get(client, token, status):
@pytest.mark.parametrize("token,status", [ @pytest.mark.parametrize("token,status", [
(VALID_USER_HEADER_TOKEN, 405), (VALID_USER_HEADER_TOKEN, 405),
(VALID_ADMIN_HEADER_TOKEN, 405), (VALID_ADMIN_HEADER_TOKEN, 405),
(VALID_ADMIN_API_TOKEN, 405),
('', 405) ('', 405)
]) ])
def test_role_list_delete(client, token, status): def test_role_list_delete(client, token, status):
@ -160,6 +169,7 @@ def test_role_list_delete(client, token, status):
@pytest.mark.parametrize("token,status", [ @pytest.mark.parametrize("token,status", [
(VALID_USER_HEADER_TOKEN, 405), (VALID_USER_HEADER_TOKEN, 405),
(VALID_ADMIN_HEADER_TOKEN, 405), (VALID_ADMIN_HEADER_TOKEN, 405),
(VALID_ADMIN_API_TOKEN, 405),
('', 405) ('', 405)
]) ])
def test_role_list_patch(client, token, status): def test_role_list_patch(client, token, status):

View File

@ -2,7 +2,7 @@ import pytest
from lemur.sources.views import * # noqa from lemur.sources.views import * # noqa
from .vectors import VALID_ADMIN_HEADER_TOKEN, VALID_USER_HEADER_TOKEN, INTERNAL_PRIVATE_KEY_A_STR, INTERNAL_VALID_WILDCARD_STR from .vectors import VALID_ADMIN_API_TOKEN, VALID_ADMIN_HEADER_TOKEN, VALID_USER_HEADER_TOKEN, INTERNAL_PRIVATE_KEY_A_STR, INTERNAL_VALID_WILDCARD_STR
def validate_source_schema(client): def validate_source_schema(client):
@ -38,6 +38,7 @@ def test_create_certificate(user, source):
@pytest.mark.parametrize("token,status", [ @pytest.mark.parametrize("token,status", [
(VALID_USER_HEADER_TOKEN, 404), (VALID_USER_HEADER_TOKEN, 404),
(VALID_ADMIN_HEADER_TOKEN, 404), (VALID_ADMIN_HEADER_TOKEN, 404),
(VALID_ADMIN_API_TOKEN, 404),
('', 401) ('', 401)
]) ])
def test_source_get(client, source_plugin, token, status): def test_source_get(client, source_plugin, token, status):
@ -47,6 +48,7 @@ def test_source_get(client, source_plugin, token, status):
@pytest.mark.parametrize("token,status", [ @pytest.mark.parametrize("token,status", [
(VALID_USER_HEADER_TOKEN, 405), (VALID_USER_HEADER_TOKEN, 405),
(VALID_ADMIN_HEADER_TOKEN, 405), (VALID_ADMIN_HEADER_TOKEN, 405),
(VALID_ADMIN_API_TOKEN, 405),
('', 405) ('', 405)
]) ])
def test_source_post_(client, token, status): def test_source_post_(client, token, status):
@ -56,6 +58,7 @@ def test_source_post_(client, token, status):
@pytest.mark.parametrize("token,status", [ @pytest.mark.parametrize("token,status", [
(VALID_USER_HEADER_TOKEN, 403), (VALID_USER_HEADER_TOKEN, 403),
(VALID_ADMIN_HEADER_TOKEN, 400), (VALID_ADMIN_HEADER_TOKEN, 400),
(VALID_ADMIN_API_TOKEN, 400),
('', 401) ('', 401)
]) ])
def test_source_put(client, token, status): def test_source_put(client, token, status):
@ -65,6 +68,7 @@ def test_source_put(client, token, status):
@pytest.mark.parametrize("token,status", [ @pytest.mark.parametrize("token,status", [
(VALID_USER_HEADER_TOKEN, 403), (VALID_USER_HEADER_TOKEN, 403),
(VALID_ADMIN_HEADER_TOKEN, 200), (VALID_ADMIN_HEADER_TOKEN, 200),
(VALID_ADMIN_API_TOKEN, 200),
('', 401) ('', 401)
]) ])
def test_source_delete(client, token, status): def test_source_delete(client, token, status):
@ -74,6 +78,7 @@ def test_source_delete(client, token, status):
@pytest.mark.parametrize("token,status", [ @pytest.mark.parametrize("token,status", [
(VALID_USER_HEADER_TOKEN, 405), (VALID_USER_HEADER_TOKEN, 405),
(VALID_ADMIN_HEADER_TOKEN, 405), (VALID_ADMIN_HEADER_TOKEN, 405),
(VALID_ADMIN_API_TOKEN, 405),
('', 405) ('', 405)
]) ])
def test_source_patch(client, token, status): def test_source_patch(client, token, status):
@ -83,6 +88,7 @@ def test_source_patch(client, token, status):
@pytest.mark.parametrize("token,status", [ @pytest.mark.parametrize("token,status", [
(VALID_USER_HEADER_TOKEN, 200), (VALID_USER_HEADER_TOKEN, 200),
(VALID_ADMIN_HEADER_TOKEN, 200), (VALID_ADMIN_HEADER_TOKEN, 200),
(VALID_ADMIN_API_TOKEN, 200),
('', 401) ('', 401)
]) ])
def test_sources_list_get(client, source_plugin, token, status): def test_sources_list_get(client, source_plugin, token, status):
@ -92,6 +98,7 @@ def test_sources_list_get(client, source_plugin, token, status):
@pytest.mark.parametrize("token,status", [ @pytest.mark.parametrize("token,status", [
(VALID_USER_HEADER_TOKEN, 403), (VALID_USER_HEADER_TOKEN, 403),
(VALID_ADMIN_HEADER_TOKEN, 400), (VALID_ADMIN_HEADER_TOKEN, 400),
(VALID_ADMIN_API_TOKEN, 400),
('', 401) ('', 401)
]) ])
def test_sources_list_post(client, token, status): def test_sources_list_post(client, token, status):
@ -101,6 +108,7 @@ def test_sources_list_post(client, token, status):
@pytest.mark.parametrize("token,status", [ @pytest.mark.parametrize("token,status", [
(VALID_USER_HEADER_TOKEN, 405), (VALID_USER_HEADER_TOKEN, 405),
(VALID_ADMIN_HEADER_TOKEN, 405), (VALID_ADMIN_HEADER_TOKEN, 405),
(VALID_ADMIN_API_TOKEN, 405),
('', 405) ('', 405)
]) ])
def test_sources_list_put(client, token, status): def test_sources_list_put(client, token, status):
@ -110,6 +118,7 @@ def test_sources_list_put(client, token, status):
@pytest.mark.parametrize("token,status", [ @pytest.mark.parametrize("token,status", [
(VALID_USER_HEADER_TOKEN, 405), (VALID_USER_HEADER_TOKEN, 405),
(VALID_ADMIN_HEADER_TOKEN, 405), (VALID_ADMIN_HEADER_TOKEN, 405),
(VALID_ADMIN_API_TOKEN, 405),
('', 405) ('', 405)
]) ])
def test_sources_list_delete(client, token, status): def test_sources_list_delete(client, token, status):
@ -119,6 +128,7 @@ def test_sources_list_delete(client, token, status):
@pytest.mark.parametrize("token,status", [ @pytest.mark.parametrize("token,status", [
(VALID_USER_HEADER_TOKEN, 405), (VALID_USER_HEADER_TOKEN, 405),
(VALID_ADMIN_HEADER_TOKEN, 405), (VALID_ADMIN_HEADER_TOKEN, 405),
(VALID_ADMIN_API_TOKEN, 405),
('', 405) ('', 405)
]) ])
def test_sources_list_patch(client, token, status): def test_sources_list_patch(client, token, status):

View File

@ -4,7 +4,7 @@ import pytest
from lemur.tests.factories import UserFactory, RoleFactory from lemur.tests.factories import UserFactory, RoleFactory
from lemur.users.views import * # noqa from lemur.users.views import * # noqa
from .vectors import VALID_ADMIN_HEADER_TOKEN, VALID_USER_HEADER_TOKEN from .vectors import VALID_ADMIN_API_TOKEN, VALID_ADMIN_HEADER_TOKEN, VALID_USER_HEADER_TOKEN
def test_user_input_schema(client): def test_user_input_schema(client):
@ -24,6 +24,7 @@ def test_user_input_schema(client):
@pytest.mark.parametrize("token,status", [ @pytest.mark.parametrize("token,status", [
(VALID_USER_HEADER_TOKEN, 200), (VALID_USER_HEADER_TOKEN, 200),
(VALID_ADMIN_HEADER_TOKEN, 200), (VALID_ADMIN_HEADER_TOKEN, 200),
(VALID_ADMIN_API_TOKEN, 200),
('', 401) ('', 401)
]) ])
def test_user_get(client, token, status): def test_user_get(client, token, status):
@ -33,6 +34,7 @@ def test_user_get(client, token, status):
@pytest.mark.parametrize("token,status", [ @pytest.mark.parametrize("token,status", [
(VALID_USER_HEADER_TOKEN, 405), (VALID_USER_HEADER_TOKEN, 405),
(VALID_ADMIN_HEADER_TOKEN, 405), (VALID_ADMIN_HEADER_TOKEN, 405),
(VALID_ADMIN_API_TOKEN, 405),
('', 405) ('', 405)
]) ])
def test_user_post_(client, token, status): def test_user_post_(client, token, status):
@ -42,6 +44,7 @@ def test_user_post_(client, token, status):
@pytest.mark.parametrize("token,status", [ @pytest.mark.parametrize("token,status", [
(VALID_USER_HEADER_TOKEN, 403), (VALID_USER_HEADER_TOKEN, 403),
(VALID_ADMIN_HEADER_TOKEN, 400), (VALID_ADMIN_HEADER_TOKEN, 400),
(VALID_ADMIN_API_TOKEN, 400),
('', 401) ('', 401)
]) ])
def test_user_put(client, token, status): def test_user_put(client, token, status):
@ -51,6 +54,7 @@ def test_user_put(client, token, status):
@pytest.mark.parametrize("token,status", [ @pytest.mark.parametrize("token,status", [
(VALID_USER_HEADER_TOKEN, 405), (VALID_USER_HEADER_TOKEN, 405),
(VALID_ADMIN_HEADER_TOKEN, 405), (VALID_ADMIN_HEADER_TOKEN, 405),
(VALID_ADMIN_API_TOKEN, 405),
('', 405) ('', 405)
]) ])
def test_user_delete(client, token, status): def test_user_delete(client, token, status):
@ -60,6 +64,7 @@ def test_user_delete(client, token, status):
@pytest.mark.parametrize("token,status", [ @pytest.mark.parametrize("token,status", [
(VALID_USER_HEADER_TOKEN, 405), (VALID_USER_HEADER_TOKEN, 405),
(VALID_ADMIN_HEADER_TOKEN, 405), (VALID_ADMIN_HEADER_TOKEN, 405),
(VALID_ADMIN_API_TOKEN, 405),
('', 405) ('', 405)
]) ])
def test_user_patch(client, token, status): def test_user_patch(client, token, status):
@ -69,6 +74,7 @@ def test_user_patch(client, token, status):
@pytest.mark.parametrize("token,status", [ @pytest.mark.parametrize("token,status", [
(VALID_USER_HEADER_TOKEN, 403), (VALID_USER_HEADER_TOKEN, 403),
(VALID_ADMIN_HEADER_TOKEN, 400), (VALID_ADMIN_HEADER_TOKEN, 400),
(VALID_ADMIN_API_TOKEN, 400),
('', 401) ('', 401)
]) ])
def test_user_list_post_(client, token, status): def test_user_list_post_(client, token, status):
@ -78,6 +84,7 @@ def test_user_list_post_(client, token, status):
@pytest.mark.parametrize("token,status", [ @pytest.mark.parametrize("token,status", [
(VALID_USER_HEADER_TOKEN, 200), (VALID_USER_HEADER_TOKEN, 200),
(VALID_ADMIN_HEADER_TOKEN, 200), (VALID_ADMIN_HEADER_TOKEN, 200),
(VALID_ADMIN_API_TOKEN, 200),
('', 401) ('', 401)
]) ])
def test_user_list_get(client, token, status): def test_user_list_get(client, token, status):
@ -87,6 +94,7 @@ def test_user_list_get(client, token, status):
@pytest.mark.parametrize("token,status", [ @pytest.mark.parametrize("token,status", [
(VALID_USER_HEADER_TOKEN, 405), (VALID_USER_HEADER_TOKEN, 405),
(VALID_ADMIN_HEADER_TOKEN, 405), (VALID_ADMIN_HEADER_TOKEN, 405),
(VALID_ADMIN_API_TOKEN, 405),
('', 405) ('', 405)
]) ])
def test_user_list_delete(client, token, status): def test_user_list_delete(client, token, status):
@ -96,6 +104,7 @@ def test_user_list_delete(client, token, status):
@pytest.mark.parametrize("token,status", [ @pytest.mark.parametrize("token,status", [
(VALID_USER_HEADER_TOKEN, 405), (VALID_USER_HEADER_TOKEN, 405),
(VALID_ADMIN_HEADER_TOKEN, 405), (VALID_ADMIN_HEADER_TOKEN, 405),
(VALID_ADMIN_API_TOKEN, 405),
('', 405) ('', 405)
]) ])
def test_user_list_patch(client, token, status): def test_user_list_patch(client, token, status):

View File

@ -12,6 +12,12 @@ VALID_ADMIN_HEADER_TOKEN = {
} }
VALID_ADMIN_API_TOKEN = {
'Authorization': 'Basic ' + 'eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJzdWIiOjIsImFpZCI6MSwiaWF0IjoxNDM1MjMzMzY5fQ.umW0I_oh4MVZ2qrClzj9SfYnQl6cd0HGzh9EwkDW60I',
'Content-Type': 'application/json'
}
INTERNAL_VALID_LONG_STR = """ INTERNAL_VALID_LONG_STR = """
-----BEGIN CERTIFICATE----- -----BEGIN CERTIFICATE-----
MIID1zCCAr+gAwIBAgIBATANBgkqhkiG9w0BAQsFADCBjDELMAkGA1UEBhMCVVMx MIID1zCCAr+gAwIBAgIBATANBgkqhkiG9w0BAQsFADCBjDELMAkGA1UEBhMCVVMx