c402f1ff87
Adds in per user api keys to the backend of lemur. the basics are: - API Keys are really just JWTs with custom second length TTLs. - API Keys are provided in the exact same ways JWTs are now. - API Keys can be revoked/unrevoked at any time by their creator as well as have their TTL Change at anytime. - Users can create/view/list their own API Keys at will, and an admin role has permission to modify all api keys in the instance. Adds in support for lemur api keys to the frontend of lemur. doing this required a few changes to the backend as well, but it is now all working (maybe not the best way though, review will determine that). - fixes inconsistency in moduleauthor name I inputted during the first commit. - Allows the revoke schema to optionally allow a full api_key object. - Adds `/users/:user_id/api_keys/:api_key` and `/users/:user_id/api_keys` endpoints. - normalizes use of `userId` vs `userId` - makes `put` call respond with a JWT so the frontend can show the token on updating. - adds in the API Key views for clicking "API Keys" on the main nav. - adds in the API Key views for clicking into a users edit page. - adds tests for the API Key backend views I added.
233 lines
7.5 KiB
Python
233 lines
7.5 KiB
Python
import json
|
|
|
|
import pytest
|
|
|
|
from lemur.authorities.views import * # noqa
|
|
from lemur.tests.factories import AuthorityFactory, RoleFactory
|
|
from lemur.tests.vectors import VALID_ADMIN_API_TOKEN, VALID_ADMIN_HEADER_TOKEN, VALID_USER_HEADER_TOKEN
|
|
|
|
|
|
def test_authority_input_schema(client, role, issuer_plugin, logged_in_user):
|
|
from lemur.authorities.schemas import AuthorityInputSchema
|
|
|
|
input_data = {
|
|
'name': 'Example Authority',
|
|
'owner': 'jim@example.com',
|
|
'description': 'An example authority.',
|
|
'commonName': 'An Example Authority',
|
|
'plugin': {'slug': 'test-issuer', 'plugin_options': [{'name': 'test', 'value': 'blah'}]},
|
|
'type': 'root',
|
|
'signingAlgorithm': 'sha256WithRSA',
|
|
'keyType': 'RSA2048',
|
|
'sensitivity': 'medium'
|
|
}
|
|
|
|
data, errors = AuthorityInputSchema().load(input_data)
|
|
|
|
assert not errors
|
|
|
|
|
|
def test_user_authority(session, client, authority, role, user, issuer_plugin):
|
|
resp = client.get(api.url_for(AuthoritiesList), headers=user['token']).json
|
|
u = user['user']
|
|
u.roles.append(role)
|
|
authority.roles.append(role)
|
|
session.commit()
|
|
assert client.get(api.url_for(AuthoritiesList), headers=user['token']).json['total'] == 1
|
|
u.roles.remove(role)
|
|
session.commit()
|
|
assert client.get(api.url_for(AuthoritiesList), headers=user['token']).json['total'] == 0
|
|
|
|
|
|
def test_create_authority(issuer_plugin, user):
|
|
from lemur.authorities.service import create
|
|
authority = create(plugin={'plugin_object': issuer_plugin, 'slug': issuer_plugin.slug}, owner='jim@example.com', type='root', creator=user['user'])
|
|
assert authority.authority_certificate
|
|
|
|
|
|
@pytest.mark.parametrize("token, count", [
|
|
(VALID_USER_HEADER_TOKEN, 0),
|
|
(VALID_ADMIN_HEADER_TOKEN, 3),
|
|
(VALID_ADMIN_API_TOKEN, 3),
|
|
])
|
|
def test_admin_authority(client, authority, issuer_plugin, token, count):
|
|
assert client.get(api.url_for(AuthoritiesList), headers=token).json['total'] == count
|
|
|
|
|
|
@pytest.mark.parametrize("token,status", [
|
|
(VALID_USER_HEADER_TOKEN, 200),
|
|
(VALID_ADMIN_HEADER_TOKEN, 200),
|
|
(VALID_ADMIN_API_TOKEN, 200),
|
|
('', 401)
|
|
])
|
|
def test_authority_get(client, token, status):
|
|
assert client.get(api.url_for(Authorities, authority_id=1), headers=token).status_code == status
|
|
|
|
|
|
@pytest.mark.parametrize("token,status", [
|
|
(VALID_USER_HEADER_TOKEN, 405),
|
|
(VALID_ADMIN_HEADER_TOKEN, 405),
|
|
(VALID_ADMIN_API_TOKEN, 405),
|
|
('', 405)
|
|
])
|
|
def test_authority_post(client, token, status):
|
|
assert client.post(api.url_for(Authorities, authority_id=1), data={}, headers=token).status_code == status
|
|
|
|
|
|
@pytest.mark.parametrize("token,status", [
|
|
(VALID_USER_HEADER_TOKEN, 400),
|
|
(VALID_ADMIN_HEADER_TOKEN, 400),
|
|
(VALID_ADMIN_API_TOKEN, 400),
|
|
('', 401)
|
|
])
|
|
def test_authority_put(client, token, status):
|
|
assert client.put(api.url_for(Authorities, authority_id=1), data={}, headers=token).status_code == status
|
|
|
|
|
|
@pytest.mark.parametrize("token,status", [
|
|
(VALID_USER_HEADER_TOKEN, 405),
|
|
(VALID_ADMIN_HEADER_TOKEN, 405),
|
|
(VALID_ADMIN_API_TOKEN, 405),
|
|
('', 405)
|
|
])
|
|
def test_authority_delete(client, token, status):
|
|
assert client.delete(api.url_for(Authorities, authority_id=1), headers=token).status_code == status
|
|
|
|
|
|
@pytest.mark.parametrize("token,status", [
|
|
(VALID_USER_HEADER_TOKEN, 405),
|
|
(VALID_ADMIN_HEADER_TOKEN, 405),
|
|
(VALID_ADMIN_API_TOKEN, 405),
|
|
('', 405)
|
|
])
|
|
def test_authority_patch(client, token, status):
|
|
assert client.patch(api.url_for(Authorities, authority_id=1), data={}, headers=token).status_code == status
|
|
|
|
|
|
@pytest.mark.parametrize("token,status", [
|
|
(VALID_USER_HEADER_TOKEN, 200),
|
|
(VALID_ADMIN_HEADER_TOKEN, 200),
|
|
(VALID_ADMIN_API_TOKEN, 200),
|
|
('', 401)
|
|
])
|
|
def test_authorities_get(client, token, status):
|
|
assert client.get(api.url_for(AuthoritiesList), headers=token).status_code == status
|
|
|
|
|
|
@pytest.mark.parametrize("token,status", [
|
|
(VALID_USER_HEADER_TOKEN, 400),
|
|
(VALID_ADMIN_HEADER_TOKEN, 400),
|
|
(VALID_ADMIN_API_TOKEN, 400),
|
|
('', 401)
|
|
])
|
|
def test_authorities_post(client, token, status):
|
|
assert client.post(api.url_for(AuthoritiesList), data={}, headers=token).status_code == status
|
|
|
|
|
|
@pytest.mark.parametrize("token,status", [
|
|
(VALID_USER_HEADER_TOKEN, 405),
|
|
(VALID_ADMIN_HEADER_TOKEN, 405),
|
|
(VALID_ADMIN_API_TOKEN, 405),
|
|
('', 405)
|
|
])
|
|
def test_authorities_put(client, token, status):
|
|
assert client.put(api.url_for(AuthoritiesList), data={}, headers=token).status_code == status
|
|
|
|
|
|
@pytest.mark.parametrize("token,status", [
|
|
(VALID_USER_HEADER_TOKEN, 405),
|
|
(VALID_ADMIN_HEADER_TOKEN, 405),
|
|
(VALID_ADMIN_API_TOKEN, 405),
|
|
('', 405)
|
|
])
|
|
def test_authorities_delete(client, token, status):
|
|
assert client.delete(api.url_for(AuthoritiesList), headers=token).status_code == status
|
|
|
|
|
|
@pytest.mark.parametrize("token,status", [
|
|
(VALID_USER_HEADER_TOKEN, 405),
|
|
(VALID_ADMIN_HEADER_TOKEN, 405),
|
|
(VALID_ADMIN_API_TOKEN, 405),
|
|
('', 405)
|
|
])
|
|
def test_authorities_patch(client, token, status):
|
|
assert client.patch(api.url_for(AuthoritiesList), data={}, headers=token).status_code == status
|
|
|
|
|
|
@pytest.mark.parametrize("token,status", [
|
|
(VALID_USER_HEADER_TOKEN, 200),
|
|
(VALID_ADMIN_HEADER_TOKEN, 200),
|
|
(VALID_ADMIN_API_TOKEN, 200),
|
|
('', 401)
|
|
])
|
|
def test_certificate_authorities_get(client, token, status):
|
|
assert client.get(api.url_for(AuthoritiesList), headers=token).status_code == status
|
|
|
|
|
|
@pytest.mark.parametrize("token,status", [
|
|
(VALID_USER_HEADER_TOKEN, 400),
|
|
(VALID_ADMIN_HEADER_TOKEN, 400),
|
|
(VALID_ADMIN_API_TOKEN, 400),
|
|
('', 401)
|
|
])
|
|
def test_certificate_authorities_post(client, token, status):
|
|
assert client.post(api.url_for(AuthoritiesList), data={}, headers=token).status_code == status
|
|
|
|
|
|
@pytest.mark.parametrize("token,status", [
|
|
(VALID_USER_HEADER_TOKEN, 405),
|
|
(VALID_ADMIN_HEADER_TOKEN, 405),
|
|
(VALID_ADMIN_API_TOKEN, 405),
|
|
('', 405)
|
|
])
|
|
def test_certificate_authorities_put(client, token, status):
|
|
assert client.put(api.url_for(AuthoritiesList), data={}, headers=token).status_code == status
|
|
|
|
|
|
@pytest.mark.parametrize("token,status", [
|
|
(VALID_USER_HEADER_TOKEN, 405),
|
|
(VALID_ADMIN_HEADER_TOKEN, 405),
|
|
(VALID_ADMIN_API_TOKEN, 405),
|
|
('', 405)
|
|
])
|
|
def test_certificate_authorities_delete(client, token, status):
|
|
assert client.delete(api.url_for(AuthoritiesList), headers=token).status_code == status
|
|
|
|
|
|
@pytest.mark.parametrize("token,status", [
|
|
(VALID_USER_HEADER_TOKEN, 405),
|
|
(VALID_ADMIN_HEADER_TOKEN, 405),
|
|
(VALID_ADMIN_API_TOKEN, 405),
|
|
('', 405)
|
|
])
|
|
def test_certificate_authorities_patch(client, token, status):
|
|
assert client.patch(api.url_for(AuthoritiesList), data={}, headers=token).status_code == status
|
|
|
|
|
|
def test_authority_roles(client, session, issuer_plugin):
|
|
auth = AuthorityFactory()
|
|
role = RoleFactory()
|
|
session.flush()
|
|
|
|
data = {
|
|
'owner': auth.owner,
|
|
'name': auth.name,
|
|
'description': auth.description,
|
|
'active': True,
|
|
'roles': [
|
|
{'id': role.id},
|
|
],
|
|
}
|
|
|
|
# Add role
|
|
resp = client.put(api.url_for(Authorities, authority_id=auth.id), data=json.dumps(data), headers=VALID_ADMIN_HEADER_TOKEN)
|
|
assert resp.status_code == 200
|
|
assert len(resp.json['roles']) == 1
|
|
assert set(auth.roles) == {role}
|
|
|
|
# Remove role
|
|
del data['roles'][0]
|
|
resp = client.put(api.url_for(Authorities, authority_id=auth.id), data=json.dumps(data), headers=VALID_ADMIN_HEADER_TOKEN)
|
|
assert resp.status_code == 200
|
|
assert len(resp.json['roles']) == 0
|