375 lines
9.1 KiB
Python
375 lines
9.1 KiB
Python
import json
|
|
|
|
import pytest
|
|
|
|
from lemur.authorities.views import * # noqa
|
|
from lemur.tests.factories import AuthorityFactory, RoleFactory
|
|
from lemur.tests.vectors import (
|
|
VALID_ADMIN_API_TOKEN,
|
|
VALID_ADMIN_HEADER_TOKEN,
|
|
VALID_USER_HEADER_TOKEN,
|
|
)
|
|
|
|
|
|
def test_authority_input_schema(client, role, issuer_plugin, logged_in_user):
|
|
from lemur.authorities.schemas import AuthorityInputSchema
|
|
|
|
input_data = {
|
|
"name": "Example Authority",
|
|
"owner": "jim@example.com",
|
|
"description": "An example authority.",
|
|
"commonName": "An Example Authority",
|
|
"plugin": {
|
|
"slug": "test-issuer",
|
|
"plugin_options": [{"name": "test", "value": "blah"}],
|
|
},
|
|
"type": "root",
|
|
"signingAlgorithm": "sha256WithRSA",
|
|
"keyType": "RSA2048",
|
|
"sensitivity": "medium",
|
|
}
|
|
|
|
data, errors = AuthorityInputSchema().load(input_data)
|
|
|
|
assert not errors
|
|
|
|
|
|
def test_authority_input_schema_ecc(client, role, issuer_plugin, logged_in_user):
|
|
from lemur.authorities.schemas import AuthorityInputSchema
|
|
|
|
input_data = {
|
|
"name": "Example Authority",
|
|
"owner": "jim@example.com",
|
|
"description": "An example authority.",
|
|
"commonName": "An Example Authority",
|
|
"plugin": {
|
|
"slug": "test-issuer",
|
|
"plugin_options": [{"name": "test", "value": "blah"}],
|
|
},
|
|
"type": "root",
|
|
"signingAlgorithm": "sha256WithECDSA",
|
|
"keyType": "ECCPRIME256V1",
|
|
"sensitivity": "medium",
|
|
}
|
|
|
|
data, errors = AuthorityInputSchema().load(input_data)
|
|
|
|
assert not errors
|
|
|
|
|
|
def test_user_authority(session, client, authority, role, user, issuer_plugin):
|
|
u = user["user"]
|
|
u.roles.append(role)
|
|
authority.roles.append(role)
|
|
session.commit()
|
|
assert (
|
|
client.get(api.url_for(AuthoritiesList), headers=user["token"]).json["total"]
|
|
== 1
|
|
)
|
|
u.roles.remove(role)
|
|
session.commit()
|
|
assert (
|
|
client.get(api.url_for(AuthoritiesList), headers=user["token"]).json["total"]
|
|
== 0
|
|
)
|
|
|
|
|
|
def test_create_authority(issuer_plugin, user):
|
|
from lemur.authorities.service import create
|
|
|
|
authority = create(
|
|
plugin={"plugin_object": issuer_plugin, "slug": issuer_plugin.slug},
|
|
owner="jim@example.com",
|
|
type="root",
|
|
creator=user["user"],
|
|
)
|
|
assert authority.authority_certificate
|
|
|
|
|
|
@pytest.mark.parametrize(
|
|
"token, count",
|
|
[
|
|
(VALID_USER_HEADER_TOKEN, 0),
|
|
(VALID_ADMIN_HEADER_TOKEN, 3),
|
|
(VALID_ADMIN_API_TOKEN, 3),
|
|
],
|
|
)
|
|
def test_admin_authority(client, authority, issuer_plugin, token, count):
|
|
assert (
|
|
client.get(api.url_for(AuthoritiesList), headers=token).json["total"] == count
|
|
)
|
|
|
|
|
|
@pytest.mark.parametrize(
|
|
"token,status",
|
|
[
|
|
(VALID_USER_HEADER_TOKEN, 200),
|
|
(VALID_ADMIN_HEADER_TOKEN, 200),
|
|
(VALID_ADMIN_API_TOKEN, 200),
|
|
("", 401),
|
|
],
|
|
)
|
|
def test_authority_get(client, token, status):
|
|
assert (
|
|
client.get(api.url_for(Authorities, authority_id=1), headers=token).status_code
|
|
== status
|
|
)
|
|
|
|
|
|
@pytest.mark.parametrize(
|
|
"token,status",
|
|
[
|
|
(VALID_USER_HEADER_TOKEN, 405),
|
|
(VALID_ADMIN_HEADER_TOKEN, 405),
|
|
(VALID_ADMIN_API_TOKEN, 405),
|
|
("", 405),
|
|
],
|
|
)
|
|
def test_authority_post(client, token, status):
|
|
assert (
|
|
client.post(
|
|
api.url_for(Authorities, authority_id=1), data={}, headers=token
|
|
).status_code
|
|
== status
|
|
)
|
|
|
|
|
|
@pytest.mark.parametrize(
|
|
"token,status",
|
|
[
|
|
(VALID_USER_HEADER_TOKEN, 400),
|
|
(VALID_ADMIN_HEADER_TOKEN, 400),
|
|
(VALID_ADMIN_API_TOKEN, 400),
|
|
("", 401),
|
|
],
|
|
)
|
|
def test_authority_put(client, token, status):
|
|
assert (
|
|
client.put(
|
|
api.url_for(Authorities, authority_id=1), data={}, headers=token
|
|
).status_code
|
|
== status
|
|
)
|
|
|
|
|
|
@pytest.mark.parametrize(
|
|
"token,status",
|
|
[
|
|
(VALID_USER_HEADER_TOKEN, 405),
|
|
(VALID_ADMIN_HEADER_TOKEN, 405),
|
|
(VALID_ADMIN_API_TOKEN, 405),
|
|
("", 405),
|
|
],
|
|
)
|
|
def test_authority_delete(client, token, status):
|
|
assert (
|
|
client.delete(
|
|
api.url_for(Authorities, authority_id=1), headers=token
|
|
).status_code
|
|
== status
|
|
)
|
|
|
|
|
|
@pytest.mark.parametrize(
|
|
"token,status",
|
|
[
|
|
(VALID_USER_HEADER_TOKEN, 405),
|
|
(VALID_ADMIN_HEADER_TOKEN, 405),
|
|
(VALID_ADMIN_API_TOKEN, 405),
|
|
("", 405),
|
|
],
|
|
)
|
|
def test_authority_patch(client, token, status):
|
|
assert (
|
|
client.patch(
|
|
api.url_for(Authorities, authority_id=1), data={}, headers=token
|
|
).status_code
|
|
== status
|
|
)
|
|
|
|
|
|
@pytest.mark.parametrize(
|
|
"token,status",
|
|
[
|
|
(VALID_USER_HEADER_TOKEN, 200),
|
|
(VALID_ADMIN_HEADER_TOKEN, 200),
|
|
(VALID_ADMIN_API_TOKEN, 200),
|
|
("", 401),
|
|
],
|
|
)
|
|
def test_authorities_get(client, token, status):
|
|
assert client.get(api.url_for(AuthoritiesList), headers=token).status_code == status
|
|
|
|
|
|
@pytest.mark.parametrize(
|
|
"token,status",
|
|
[
|
|
(VALID_USER_HEADER_TOKEN, 400),
|
|
(VALID_ADMIN_HEADER_TOKEN, 400),
|
|
(VALID_ADMIN_API_TOKEN, 400),
|
|
("", 401),
|
|
],
|
|
)
|
|
def test_authorities_post(client, token, status):
|
|
assert (
|
|
client.post(api.url_for(AuthoritiesList), data={}, headers=token).status_code
|
|
== status
|
|
)
|
|
|
|
|
|
@pytest.mark.parametrize(
|
|
"token,status",
|
|
[
|
|
(VALID_USER_HEADER_TOKEN, 405),
|
|
(VALID_ADMIN_HEADER_TOKEN, 405),
|
|
(VALID_ADMIN_API_TOKEN, 405),
|
|
("", 405),
|
|
],
|
|
)
|
|
def test_authorities_put(client, token, status):
|
|
assert (
|
|
client.put(api.url_for(AuthoritiesList), data={}, headers=token).status_code
|
|
== status
|
|
)
|
|
|
|
|
|
@pytest.mark.parametrize(
|
|
"token,status",
|
|
[
|
|
(VALID_USER_HEADER_TOKEN, 405),
|
|
(VALID_ADMIN_HEADER_TOKEN, 405),
|
|
(VALID_ADMIN_API_TOKEN, 405),
|
|
("", 405),
|
|
],
|
|
)
|
|
def test_authorities_delete(client, token, status):
|
|
assert (
|
|
client.delete(api.url_for(AuthoritiesList), headers=token).status_code == status
|
|
)
|
|
|
|
|
|
@pytest.mark.parametrize(
|
|
"token,status",
|
|
[
|
|
(VALID_USER_HEADER_TOKEN, 405),
|
|
(VALID_ADMIN_HEADER_TOKEN, 405),
|
|
(VALID_ADMIN_API_TOKEN, 405),
|
|
("", 405),
|
|
],
|
|
)
|
|
def test_authorities_patch(client, token, status):
|
|
assert (
|
|
client.patch(api.url_for(AuthoritiesList), data={}, headers=token).status_code
|
|
== status
|
|
)
|
|
|
|
|
|
@pytest.mark.parametrize(
|
|
"token,status",
|
|
[
|
|
(VALID_USER_HEADER_TOKEN, 200),
|
|
(VALID_ADMIN_HEADER_TOKEN, 200),
|
|
(VALID_ADMIN_API_TOKEN, 200),
|
|
("", 401),
|
|
],
|
|
)
|
|
def test_certificate_authorities_get(client, token, status):
|
|
assert client.get(api.url_for(AuthoritiesList), headers=token).status_code == status
|
|
|
|
|
|
@pytest.mark.parametrize(
|
|
"token,status",
|
|
[
|
|
(VALID_USER_HEADER_TOKEN, 400),
|
|
(VALID_ADMIN_HEADER_TOKEN, 400),
|
|
(VALID_ADMIN_API_TOKEN, 400),
|
|
("", 401),
|
|
],
|
|
)
|
|
def test_certificate_authorities_post(client, token, status):
|
|
assert (
|
|
client.post(api.url_for(AuthoritiesList), data={}, headers=token).status_code
|
|
== status
|
|
)
|
|
|
|
|
|
@pytest.mark.parametrize(
|
|
"token,status",
|
|
[
|
|
(VALID_USER_HEADER_TOKEN, 405),
|
|
(VALID_ADMIN_HEADER_TOKEN, 405),
|
|
(VALID_ADMIN_API_TOKEN, 405),
|
|
("", 405),
|
|
],
|
|
)
|
|
def test_certificate_authorities_put(client, token, status):
|
|
assert (
|
|
client.put(api.url_for(AuthoritiesList), data={}, headers=token).status_code
|
|
== status
|
|
)
|
|
|
|
|
|
@pytest.mark.parametrize(
|
|
"token,status",
|
|
[
|
|
(VALID_USER_HEADER_TOKEN, 405),
|
|
(VALID_ADMIN_HEADER_TOKEN, 405),
|
|
(VALID_ADMIN_API_TOKEN, 405),
|
|
("", 405),
|
|
],
|
|
)
|
|
def test_certificate_authorities_delete(client, token, status):
|
|
assert (
|
|
client.delete(api.url_for(AuthoritiesList), headers=token).status_code == status
|
|
)
|
|
|
|
|
|
@pytest.mark.parametrize(
|
|
"token,status",
|
|
[
|
|
(VALID_USER_HEADER_TOKEN, 405),
|
|
(VALID_ADMIN_HEADER_TOKEN, 405),
|
|
(VALID_ADMIN_API_TOKEN, 405),
|
|
("", 405),
|
|
],
|
|
)
|
|
def test_certificate_authorities_patch(client, token, status):
|
|
assert (
|
|
client.patch(api.url_for(AuthoritiesList), data={}, headers=token).status_code
|
|
== status
|
|
)
|
|
|
|
|
|
def test_authority_roles(client, session, issuer_plugin):
|
|
auth = AuthorityFactory()
|
|
role = RoleFactory()
|
|
session.flush()
|
|
|
|
data = {
|
|
"owner": auth.owner,
|
|
"name": auth.name,
|
|
"description": auth.description,
|
|
"active": True,
|
|
"roles": [{"id": role.id}],
|
|
}
|
|
|
|
# Add role
|
|
resp = client.put(
|
|
api.url_for(Authorities, authority_id=auth.id),
|
|
data=json.dumps(data),
|
|
headers=VALID_ADMIN_HEADER_TOKEN,
|
|
)
|
|
assert resp.status_code == 200
|
|
assert len(resp.json["roles"]) == 1
|
|
assert set(auth.roles) == {role}
|
|
|
|
# Remove role
|
|
del data["roles"][0]
|
|
resp = client.put(
|
|
api.url_for(Authorities, authority_id=auth.id),
|
|
data=json.dumps(data),
|
|
headers=VALID_ADMIN_HEADER_TOKEN,
|
|
)
|
|
assert resp.status_code == 200
|
|
assert len(resp.json["roles"]) == 0
|