Merge branch 'master' into feature/store-acme-account-details

This commit is contained in:
Mathias Petermann 2020-10-11 14:37:31 +02:00 committed by GitHub
commit 817fc3f0fe
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
18 changed files with 353 additions and 67 deletions

View File

@ -43,13 +43,13 @@ class AuthorityInputSchema(LemurInputSchema):
organization = fields.String( organization = fields.String(
missing=lambda: current_app.config.get("LEMUR_DEFAULT_ORGANIZATION") missing=lambda: current_app.config.get("LEMUR_DEFAULT_ORGANIZATION")
) )
location = fields.String( location = fields.String()
missing=lambda: current_app.config.get("LEMUR_DEFAULT_LOCATION")
)
country = fields.String( country = fields.String(
missing=lambda: current_app.config.get("LEMUR_DEFAULT_COUNTRY") missing=lambda: current_app.config.get("LEMUR_DEFAULT_COUNTRY")
) )
state = fields.String(missing=lambda: current_app.config.get("LEMUR_DEFAULT_STATE")) state = fields.String(missing=lambda: current_app.config.get("LEMUR_DEFAULT_STATE"))
# Creating a String field instead of Email to allow empty value
email = fields.String()
plugin = fields.Nested(PluginInputSchema) plugin = fields.Nested(PluginInputSchema)

View File

@ -23,6 +23,7 @@ from lemur.domains.schemas import DomainNestedOutputSchema
from lemur.notifications import service as notification_service from lemur.notifications import service as notification_service
from lemur.notifications.schemas import NotificationNestedOutputSchema from lemur.notifications.schemas import NotificationNestedOutputSchema
from lemur.policies.schemas import RotationPolicyNestedOutputSchema from lemur.policies.schemas import RotationPolicyNestedOutputSchema
from lemur.roles import service as roles_service
from lemur.roles.schemas import RoleNestedOutputSchema from lemur.roles.schemas import RoleNestedOutputSchema
from lemur.schemas import ( from lemur.schemas import (
AssociatedAuthoritySchema, AssociatedAuthoritySchema,
@ -107,9 +108,7 @@ class CertificateInputSchema(CertificateCreationSchema):
organization = fields.String( organization = fields.String(
missing=lambda: current_app.config.get("LEMUR_DEFAULT_ORGANIZATION") missing=lambda: current_app.config.get("LEMUR_DEFAULT_ORGANIZATION")
) )
location = fields.String( location = fields.String()
missing=lambda: current_app.config.get("LEMUR_DEFAULT_LOCATION")
)
country = fields.String( country = fields.String(
missing=lambda: current_app.config.get("LEMUR_DEFAULT_COUNTRY") missing=lambda: current_app.config.get("LEMUR_DEFAULT_COUNTRY")
) )
@ -186,25 +185,52 @@ class CertificateEditInputSchema(CertificateSchema):
data["replaces"] = data[ data["replaces"] = data[
"replacements" "replacements"
] # TODO remove when field is deprecated ] # TODO remove when field is deprecated
if data.get("owner"):
# Check if role already exists. This avoids adding duplicate role.
if data.get("roles") and any(r.get("name") == data["owner"] for r in data["roles"]):
return data
# Add required role
owner_role = roles_service.get_or_create(
data["owner"],
description=f"Auto generated role based on owner: {data['owner']}"
)
# Put role info in correct format using RoleNestedOutputSchema
owner_role_dict = RoleNestedOutputSchema().dump(owner_role).data
if data.get("roles"):
data["roles"].append(owner_role_dict)
else:
data["roles"] = [owner_role_dict]
return data return data
@post_load @post_load
def enforce_notifications(self, data): def enforce_notifications(self, data):
""" """
Ensures that when an owner changes, default notifications are added for the new owner. Add default notification for current owner if none exist.
Old owner notifications are retained unless explicitly removed. This ensures that the default notifications are added in the event of owner change.
Old owner notifications are retained unless explicitly removed later in the code path.
:param data: :param data:
:return: :return:
""" """
if data["owner"]: if data.get("owner"):
notification_name = "DEFAULT_{0}".format( notification_name = "DEFAULT_{0}".format(
data["owner"].split("@")[0].upper() data["owner"].split("@")[0].upper()
) )
# Even if one default role exists, return
# This allows a User to remove unwanted default notification for current owner
if any(n.label.startswith(notification_name) for n in data["notifications"]):
return data
data[ data[
"notifications" "notifications"
] += notification_service.create_default_expiration_notifications( ] += notification_service.create_default_expiration_notifications(
notification_name, [data["owner"]] notification_name, [data["owner"]]
) )
return data return data

View File

@ -256,17 +256,29 @@ def update(cert_id, **kwargs):
return database.update(cert) return database.update(cert)
def create_certificate_roles(**kwargs): def cleanup_owner_roles_notification(owner_name, kwargs):
# create an role for the owner and assign it kwargs["roles"] = [r for r in kwargs["roles"] if r.name != owner_name]
owner_role = role_service.get_by_name(kwargs["owner"]) notification_prefix = f"DEFAULT_{owner_name.split('@')[0].upper()}"
kwargs["notifications"] = [n for n in kwargs["notifications"] if not n.label.startswith(notification_prefix)]
if not owner_role:
owner_role = role_service.create( def update_notify(cert, notify_flag):
kwargs["owner"], """
description="Auto generated role based on owner: {0}".format( Toggle notification value which is a boolean
kwargs["owner"] :param notify_flag: new notify value
), :param cert: Certificate object to be updated
) :return:
"""
cert.notify = notify_flag
return database.update(cert)
def create_certificate_roles(**kwargs):
# create a role for the owner and assign it
owner_role = role_service.get_or_create(
kwargs["owner"],
description=f"Auto generated role based on owner: {kwargs['owner']}"
)
# ensure that the authority's owner is also associated with the certificate # ensure that the authority's owner is also associated with the certificate
if kwargs.get("authority"): if kwargs.get("authority"):

View File

@ -884,10 +884,118 @@ class Certificates(AuthenticatedResource):
400, 400,
) )
# if owner is changed, remove all notifications and roles associated with old owner
if cert.owner != data["owner"]:
service.cleanup_owner_roles_notification(cert.owner, data)
cert = service.update(certificate_id, **data) cert = service.update(certificate_id, **data)
log_service.create(g.current_user, "update_cert", certificate=cert) log_service.create(g.current_user, "update_cert", certificate=cert)
return cert return cert
@validate_schema(certificate_edit_input_schema, certificate_output_schema)
def post(self, certificate_id, data=None):
"""
.. http:post:: /certificates/1/update/notify
Update certificate notification
**Example request**:
.. sourcecode:: http
POST /certificates/1/update/notify HTTP/1.1
Host: example.com
Accept: application/json, text/javascript
{
"notify": false
}
**Example response**:
.. sourcecode:: http
HTTP/1.1 200 OK
Vary: Accept
Content-Type: text/javascript
{
"status": null,
"cn": "*.test.example.net",
"chain": "",
"authority": {
"active": true,
"owner": "secure@example.com",
"id": 1,
"description": "verisign test authority",
"name": "verisign"
},
"owner": "joe@example.com",
"serial": "82311058732025924142789179368889309156",
"id": 2288,
"issuer": "SymantecCorporation",
"dateCreated": "2016-06-03T06:09:42.133769+00:00",
"notBefore": "2016-06-03T00:00:00+00:00",
"notAfter": "2018-01-12T23:59:59+00:00",
"destinations": [],
"bits": 2048,
"body": "-----BEGIN CERTIFICATE-----...",
"description": null,
"deleted": null,
"notify": false,
"notifications": [{
"id": 1
}]
"signingAlgorithm": "sha256",
"user": {
"username": "jane",
"active": true,
"email": "jane@example.com",
"id": 2
},
"active": true,
"domains": [{
"sensitive": false,
"id": 1090,
"name": "*.test.example.net"
}],
"replaces": [],
"name": "WILDCARD.test.example.net-SymantecCorporation-20160603-20180112",
"roles": [{
"id": 464,
"description": "This is a google group based role created by Lemur",
"name": "joe@example.com"
}],
"rotation": true,
"rotationPolicy": {"name": "default"},
"san": null
}
:reqheader Authorization: OAuth token to authenticate
:statuscode 200: no error
:statuscode 403: unauthenticated
"""
cert = service.get(certificate_id)
if not cert:
return dict(message="Cannot find specified certificate"), 404
# allow creators
if g.current_user != cert.user:
owner_role = role_service.get_by_name(cert.owner)
permission = CertificatePermission(owner_role, [x.name for x in cert.roles])
if not permission.can():
return (
dict(message="You are not authorized to update this certificate"),
403,
)
cert = service.update_notify(cert, data.get("notify"))
log_service.create(g.current_user, "update_cert", certificate=cert)
return cert
def delete(self, certificate_id, data=None): def delete(self, certificate_id, data=None):
""" """
.. http:delete:: /certificates/1 .. http:delete:: /certificates/1
@ -1354,6 +1462,9 @@ api.add_resource(
api.add_resource( api.add_resource(
Certificates, "/certificates/<int:certificate_id>", endpoint="certificate" Certificates, "/certificates/<int:certificate_id>", endpoint="certificate"
) )
api.add_resource(
Certificates, "/certificates/<int:certificate_id>/update/notify", endpoint="certificateUpdateNotify"
)
api.add_resource(CertificatesStats, "/certificates/stats", endpoint="certificateStats") api.add_resource(CertificatesStats, "/certificates/stats", endpoint="certificateStats")
api.add_resource( api.add_resource(
CertificatesUpload, "/certificates/upload", endpoint="certificateUpload" CertificatesUpload, "/certificates/upload", endpoint="certificateUpload"

View File

@ -9,6 +9,7 @@
import random import random
import re import re
import string import string
import pem
import sqlalchemy import sqlalchemy
from cryptography import x509 from cryptography import x509
@ -16,7 +17,7 @@ from cryptography.exceptions import InvalidSignature, UnsupportedAlgorithm
from cryptography.hazmat.backends import default_backend from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives import hashes from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.primitives.asymmetric import rsa, ec, padding from cryptography.hazmat.primitives.asymmetric import rsa, ec, padding
from cryptography.hazmat.primitives.serialization import load_pem_private_key from cryptography.hazmat.primitives.serialization import load_pem_private_key, Encoding, pkcs7
from flask_restful.reqparse import RequestParser from flask_restful.reqparse import RequestParser
from sqlalchemy import and_, func from sqlalchemy import and_, func
@ -357,3 +358,19 @@ def find_matching_certificates_by_hash(cert, matching_certs):
): ):
matching.append(c) matching.append(c)
return matching return matching
def convert_pkcs7_bytes_to_pem(certs_pkcs7):
"""
Given a list of certificates in pkcs7 encoding (bytes), covert them into a list of PEM encoded files
:raises ValueError or ValidationError
:param certs_pkcs7:
:return: list of certs in PEM format
"""
certificates = pkcs7.load_pem_pkcs7_certificates(certs_pkcs7)
certificates_pem = []
for cert in certificates:
certificates_pem.append(pem.parse(cert.public_bytes(encoding=Encoding.PEM))[0])
return certificates_pem

View File

@ -21,7 +21,7 @@ import requests
import sys import sys
from cryptography import x509 from cryptography import x509
from flask import current_app, g from flask import current_app, g
from lemur.common.utils import validate_conf from lemur.common.utils import validate_conf, convert_pkcs7_bytes_to_pem
from lemur.extensions import metrics from lemur.extensions import metrics
from lemur.plugins import lemur_digicert as digicert from lemur.plugins import lemur_digicert as digicert
from lemur.plugins.bases import IssuerPlugin, SourcePlugin from lemur.plugins.bases import IssuerPlugin, SourcePlugin
@ -235,15 +235,18 @@ def get_certificate_id(session, base_url, order_id):
@retry(stop_max_attempt_number=10, wait_fixed=10000) @retry(stop_max_attempt_number=10, wait_fixed=10000)
def get_cis_certificate(session, base_url, order_id): def get_cis_certificate(session, base_url, order_id):
"""Retrieve certificate order id from Digicert API.""" """Retrieve certificate order id from Digicert API, including the chain"""
certificate_url = "{0}/platform/cis/certificate/{1}".format(base_url, order_id) certificate_url = "{0}/platform/cis/certificate/{1}/download".format(base_url, order_id)
session.headers.update({"Accept": "application/x-pem-file"}) session.headers.update({"Accept": "application/x-pkcs7-certificates"})
response = session.get(certificate_url) response = session.get(certificate_url)
if response.status_code == 404: if response.status_code == 404:
raise Exception("Order not in issued state.") raise Exception("Order not in issued state.")
return response.content cert_chain_pem = convert_pkcs7_bytes_to_pem(response.content)
if len(cert_chain_pem) < 3:
raise Exception("Missing the certificate chain")
return cert_chain_pem
class DigiCertSourcePlugin(SourcePlugin): class DigiCertSourcePlugin(SourcePlugin):
@ -447,7 +450,6 @@ class DigiCertCISSourcePlugin(SourcePlugin):
"DIGICERT_CIS_API_KEY", "DIGICERT_CIS_API_KEY",
"DIGICERT_CIS_URL", "DIGICERT_CIS_URL",
"DIGICERT_CIS_ROOTS", "DIGICERT_CIS_ROOTS",
"DIGICERT_CIS_INTERMEDIATES",
"DIGICERT_CIS_PROFILE_NAMES", "DIGICERT_CIS_PROFILE_NAMES",
] ]
validate_conf(current_app, required_vars) validate_conf(current_app, required_vars)
@ -522,7 +524,6 @@ class DigiCertCISIssuerPlugin(IssuerPlugin):
"DIGICERT_CIS_API_KEY", "DIGICERT_CIS_API_KEY",
"DIGICERT_CIS_URL", "DIGICERT_CIS_URL",
"DIGICERT_CIS_ROOTS", "DIGICERT_CIS_ROOTS",
"DIGICERT_CIS_INTERMEDIATES",
"DIGICERT_CIS_PROFILE_NAMES", "DIGICERT_CIS_PROFILE_NAMES",
] ]
@ -552,22 +553,15 @@ class DigiCertCISIssuerPlugin(IssuerPlugin):
data = handle_cis_response(response) data = handle_cis_response(response)
# retrieve certificate # retrieve certificate
certificate_pem = get_cis_certificate(self.session, base_url, data["id"]) certificate_chain_pem = get_cis_certificate(self.session, base_url, data["id"])
self.session.headers.pop("Accept") self.session.headers.pop("Accept")
end_entity = pem.parse(certificate_pem)[0] end_entity = certificate_chain_pem[0]
intermediate = certificate_chain_pem[1]
if "ECC" in issuer_options["key_type"]:
return (
"\n".join(str(end_entity).splitlines()),
current_app.config.get("DIGICERT_ECC_CIS_INTERMEDIATES", {}).get(issuer_options['authority'].name),
data["id"],
)
# By default return RSA
return ( return (
"\n".join(str(end_entity).splitlines()), "\n".join(str(end_entity).splitlines()),
current_app.config.get("DIGICERT_CIS_INTERMEDIATES", {}).get(issuer_options['authority'].name), "\n".join(str(intermediate).splitlines()),
data["id"], data["id"],
) )

View File

@ -34,8 +34,7 @@ def determine_end_date(end_date):
if not end_date: if not end_date:
end_date = max_validity_end end_date = max_validity_end
elif end_date > max_validity_end:
if end_date > max_validity_end:
end_date = max_validity_end end_date = max_validity_end
return end_date.format('YYYY-MM-DD') return end_date.format('YYYY-MM-DD')

View File

@ -3,6 +3,7 @@ from unittest.mock import patch, Mock
import arrow import arrow
from cryptography import x509 from cryptography import x509
from lemur.plugins.lemur_entrust import plugin from lemur.plugins.lemur_entrust import plugin
from freezegun import freeze_time
def config_mock(*args): def config_mock(*args):
@ -21,11 +22,18 @@ def config_mock(*args):
return values[args[0]] return values[args[0]]
@patch("lemur.plugins.lemur_digicert.plugin.current_app")
def test_determine_end_date(mock_current_app):
with freeze_time(time_to_freeze=arrow.get(2016, 11, 3).datetime):
assert arrow.get(2017, 12, 3).format('YYYY-MM-DD') == plugin.determine_end_date(0) # 1 year + 1 month
assert arrow.get(2017, 3, 5).format('YYYY-MM-DD') == plugin.determine_end_date(arrow.get(2017, 3, 5))
assert arrow.get(2017, 12, 3).format('YYYY-MM-DD') == plugin.determine_end_date(arrow.get(2020, 5, 7))
@patch("lemur.plugins.lemur_entrust.plugin.current_app") @patch("lemur.plugins.lemur_entrust.plugin.current_app")
def test_process_options(mock_current_app, authority): def test_process_options(mock_current_app, authority):
mock_current_app.config.get = Mock(side_effect=config_mock) mock_current_app.config.get = Mock(side_effect=config_mock)
plugin.determine_end_date = Mock(return_value=arrow.get(2020, 10, 7).format('YYYY-MM-DD')) plugin.determine_end_date = Mock(return_value=arrow.get(2017, 11, 5).format('YYYY-MM-DD'))
authority.name = "Entrust" authority.name = "Entrust"
names = [u"one.example.com", u"two.example.com", u"three.example.com"] names = [u"one.example.com", u"two.example.com", u"three.example.com"]
options = { options = {
@ -35,7 +43,7 @@ def test_process_options(mock_current_app, authority):
"extensions": {"sub_alt_names": {"names": [x509.DNSName(x) for x in names]}}, "extensions": {"sub_alt_names": {"names": [x509.DNSName(x) for x in names]}},
"organization": "Example, Inc.", "organization": "Example, Inc.",
"organizational_unit": "Example Org", "organizational_unit": "Example Org",
"validity_end": arrow.get(2020, 10, 7), "validity_end": arrow.utcnow().shift(years=1, months=+1),
"authority": authority, "authority": authority,
} }
@ -43,7 +51,7 @@ def test_process_options(mock_current_app, authority):
"signingAlg": "SHA-2", "signingAlg": "SHA-2",
"eku": "SERVER_AND_CLIENT_AUTH", "eku": "SERVER_AND_CLIENT_AUTH",
"certType": "ADVANTAGE_SSL", "certType": "ADVANTAGE_SSL",
"certExpiryDate": arrow.get(2020, 10, 7).format('YYYY-MM-DD'), "certExpiryDate": arrow.get(2017, 11, 5).format('YYYY-MM-DD'),
"tracking": { "tracking": {
"requesterName": mock_current_app.config.get("ENTRUST_NAME"), "requesterName": mock_current_app.config.get("ENTRUST_NAME"),
"requesterEmail": mock_current_app.config.get("ENTRUST_EMAIL"), "requesterEmail": mock_current_app.config.get("ENTRUST_EMAIL"),

View File

@ -128,3 +128,11 @@ def render(args):
query = database.filter(query, Role, terms) query = database.filter(query, Role, terms)
return database.sort_and_page(query, Role, args) return database.sort_and_page(query, Role, args)
def get_or_create(role_name, description):
role = get_by_name(role_name)
if not role:
role = create(name=role_name, description=description)
return role

View File

@ -124,4 +124,8 @@ angular.module('lemur')
opened: false opened: false
}; };
$scope.populateSubjectEmail = function () {
$scope.authority.email = $scope.authority.owner;
};
}); });

View File

@ -26,8 +26,7 @@
Location Location
</label> </label>
<div class="col-sm-10"> <div class="col-sm-10">
<input name="location" ng-model="authority.location" placeholder="Location" class="form-control" required/> <input name="location" ng-model="authority.location" placeholder="Location" class="form-control"/>
<p ng-show="dnForm.location.$invalid && !dnForm.location.$pristine" class="help-block">You must enter a location</p>
</div> </div>
</div> </div>
<div class="form-group" <div class="form-group"
@ -49,6 +48,15 @@
<input name="organizationalUnit" ng-model="authority.organizationalUnit" placeholder="Organizational Unit" class="form-control"/> <input name="organizationalUnit" ng-model="authority.organizationalUnit" placeholder="Organizational Unit" class="form-control"/>
</div> </div>
</div> </div>
<div class="form-group"
ng-class="{'has-error': dnForm.email.$invalid, 'has-success': !dnForm.$invalid&&dnForm.email.$dirty}">
<label class="control-label col-sm-2">
Email
</label>
<div class="col-sm-10">
<input type="email" name="email" ng-model="authority.email" placeholder="Email Address" class="form-control"/>
</div>
</div>
</div> </div>
</form> </form>

View File

@ -21,7 +21,7 @@
<div class="col-sm-10"> <div class="col-sm-10">
<input type="email" name="owner" ng-model="authority.owner" placeholder="TeamDL@example.com" <input type="email" name="owner" ng-model="authority.owner" placeholder="TeamDL@example.com"
uib-tooltip="This is the authorities team distribution list or the main point of contact for this authority" uib-tooltip="This is the authorities team distribution list or the main point of contact for this authority"
class="form-control" required/> class="form-control" ng-change="populateSubjectEmail()" required/>
<p ng-show="trackingForm.owner.$invalid && !trackingForm.owner.$pristine" class="help-block">You must <p ng-show="trackingForm.owner.$invalid && !trackingForm.owner.$pristine" class="help-block">You must
enter an Certificate Authority owner</p> enter an Certificate Authority owner</p>
</div> </div>

View File

@ -38,9 +38,7 @@
Location Location
</label> </label>
<div class="col-sm-10"> <div class="col-sm-10">
<input name="location" ng-model="certificate.location" placeholder="Location" class="form-control" required/> <input name="location" ng-model="certificate.location" placeholder="Location" class="form-control"/>
<p ng-show="dnForm.location.$invalid && !dnForm.location.$pristine" class="help-block">You must enter a
location</p>
</div> </div>
</div> </div>
<div class="form-group" <div class="form-group"

View File

@ -301,7 +301,7 @@ angular.module('lemur')
}; };
CertificateService.updateNotify = function (certificate) { CertificateService.updateNotify = function (certificate) {
return certificate.put(); return certificate.post();
}; };
CertificateService.export = function (certificate) { CertificateService.export = function (certificate) {

View File

@ -37,9 +37,9 @@ LEMUR_ENCRYPTION_KEYS = base64.urlsafe_b64encode(get_random_secret(length=32).en
# List of domain regular expressions that non-admin users can issue # List of domain regular expressions that non-admin users can issue
LEMUR_WHITELISTED_DOMAINS = [ LEMUR_WHITELISTED_DOMAINS = [
"^[a-zA-Z0-9-]+\.example\.com$", r"^[a-zA-Z0-9-]+\.example\.com$",
"^[a-zA-Z0-9-]+\.example\.org$", r"^[a-zA-Z0-9-]+\.example\.org$",
"^example\d+\.long\.com$", r"^example\d+\.long\.com$",
] ]
# Mail Server # Mail Server
@ -99,7 +99,6 @@ DIGICERT_CIS_URL = "mock://www.digicert.com"
DIGICERT_CIS_PROFILE_NAMES = {"sha2-rsa-ecc-root": "ssl_plus"} DIGICERT_CIS_PROFILE_NAMES = {"sha2-rsa-ecc-root": "ssl_plus"}
DIGICERT_CIS_API_KEY = "api-key" DIGICERT_CIS_API_KEY = "api-key"
DIGICERT_CIS_ROOTS = {"root": "ROOT"} DIGICERT_CIS_ROOTS = {"root": "ROOT"}
DIGICERT_CIS_INTERMEDIATES = {"inter": "INTERMEDIATE_CA_CERT"}
VERISIGN_URL = "http://example.com" VERISIGN_URL = "http://example.com"
VERISIGN_PEM_PATH = "~/" VERISIGN_PEM_PATH = "~/"

View File

@ -154,7 +154,7 @@ def test_get_certificate_primitives(certificate):
with freeze_time(datetime.date(year=2016, month=10, day=30)): with freeze_time(datetime.date(year=2016, month=10, day=30)):
primitives = get_certificate_primitives(certificate) primitives = get_certificate_primitives(certificate)
assert len(primitives) == 26 assert len(primitives) == 25
assert (primitives["key_type"] == "RSA2048") assert (primitives["key_type"] == "RSA2048")
@ -180,7 +180,10 @@ def test_certificate_edit_schema(session):
input_data = {"owner": "bob@example.com"} input_data = {"owner": "bob@example.com"}
data, errors = CertificateEditInputSchema().load(input_data) data, errors = CertificateEditInputSchema().load(input_data)
assert not errors
assert len(data["notifications"]) == 3 assert len(data["notifications"]) == 3
assert data["roles"][0].name == input_data["owner"]
def test_authority_key_identifier_schema(): def test_authority_key_identifier_schema():
@ -254,17 +257,18 @@ def test_certificate_input_schema(client, authority):
"validityStart": arrow.get(2018, 11, 9).isoformat(), "validityStart": arrow.get(2018, 11, 9).isoformat(),
"validityEnd": arrow.get(2019, 11, 9).isoformat(), "validityEnd": arrow.get(2019, 11, 9).isoformat(),
"dnsProvider": None, "dnsProvider": None,
"location": "A Place"
} }
data, errors = CertificateInputSchema().load(input_data) data, errors = CertificateInputSchema().load(input_data)
assert not errors assert not errors
assert data["authority"].id == authority.id assert data["authority"].id == authority.id
assert data["location"] == "A Place"
# make sure the defaults got set # make sure the defaults got set
assert data["common_name"] == "test.example.com" assert data["common_name"] == "test.example.com"
assert data["country"] == "US" assert data["country"] == "US"
assert data["location"] == "Los Gatos"
assert len(data.keys()) == 19 assert len(data.keys()) == 19
@ -921,20 +925,26 @@ def test_certificate_get_body(client):
@pytest.mark.parametrize( @pytest.mark.parametrize(
"token,status", "token,status",
[ [
(VALID_USER_HEADER_TOKEN, 405), (VALID_USER_HEADER_TOKEN, 403),
(VALID_ADMIN_HEADER_TOKEN, 405), (VALID_ADMIN_HEADER_TOKEN, 200),
(VALID_ADMIN_API_TOKEN, 405), (VALID_ADMIN_API_TOKEN, 200),
("", 405), ("", 401),
], ],
) )
def test_certificate_post(client, token, status): def test_certificate_post_update_notify(client, certificate, token, status):
assert ( # negate the current notify flag and pass it to update POST call to flip the notify
client.post( toggled_notify = not certificate.notify
api.url_for(Certificates, certificate_id=1), data={}, headers=token
).status_code response = client.post(
== status api.url_for(Certificates, certificate_id=certificate.id),
data=json.dumps({"notify": toggled_notify}),
headers=token
) )
assert response.status_code == status
if status == 200:
assert response.json.get("notify") == toggled_notify
@pytest.mark.parametrize( @pytest.mark.parametrize(
"token,status", "token,status",
@ -963,6 +973,9 @@ def test_certificate_put_with_data(client, certificate, issuer_plugin):
headers=VALID_ADMIN_HEADER_TOKEN, headers=VALID_ADMIN_HEADER_TOKEN,
) )
assert resp.status_code == 200 assert resp.status_code == 200
assert len(certificate.notifications) == 3
assert certificate.roles[0].name == "bob@example.com"
assert certificate.notify
@pytest.mark.parametrize( @pytest.mark.parametrize(

View File

@ -10,6 +10,7 @@ from lemur.tests.vectors import (
ECDSA_SECP384r1_CERT, ECDSA_SECP384r1_CERT,
ECDSA_SECP384r1_CERT_STR, ECDSA_SECP384r1_CERT_STR,
DSA_CERT, DSA_CERT,
CERT_CHAIN_PKCS7_PEM
) )
@ -114,3 +115,16 @@ def test_get_key_type_from_certificate():
from lemur.common.utils import get_key_type_from_certificate from lemur.common.utils import get_key_type_from_certificate
assert (get_key_type_from_certificate(SAN_CERT_STR) == "RSA2048") assert (get_key_type_from_certificate(SAN_CERT_STR) == "RSA2048")
assert (get_key_type_from_certificate(ECDSA_SECP384r1_CERT_STR) == "ECCSECP384R1") assert (get_key_type_from_certificate(ECDSA_SECP384r1_CERT_STR) == "ECCSECP384R1")
def test_convert_pkcs7_bytes_to_pem():
from lemur.common.utils import convert_pkcs7_bytes_to_pem
from lemur.common.utils import parse_certificate
cert_chain = convert_pkcs7_bytes_to_pem(CERT_CHAIN_PKCS7_PEM)
assert(len(cert_chain) == 3)
leaf = cert_chain[1]
root = cert_chain[2]
assert(parse_certificate("\n".join(str(root).splitlines())) == ROOTCA_CERT)
assert (parse_certificate("\n".join(str(leaf).splitlines())) == INTERMEDIATE_CERT)

View File

@ -512,3 +512,78 @@ BglghkgBZQMEAwIDMAAwLQIVANubSNMSLt8plN9ZV3cp4pe3lMYCAhQPLLE7rTgm
-----END CERTIFICATE----- -----END CERTIFICATE-----
""" """
DSA_CERT = parse_certificate(DSA_CERT_STR) DSA_CERT = parse_certificate(DSA_CERT_STR)
CERT_CHAIN_PKCS7_STR = """
-----BEGIN PKCS7-----
MIIMfwYJKoZIhvcNAQcCoIIMcDCCDGwCAQExADALBgkqhkiG9w0BBwGgggxSMIIE
FjCCAv6gAwIBAgIQbIbX/Ap0Roqzf5HeN5akmzANBgkqhkiG9w0BAQsFADCBpDEq
MCgGA1UEAwwhTGVtdXJUcnVzdCBVbml0dGVzdHMgUm9vdCBDQSAyMDE4MSMwIQYD
VQQKDBpMZW11clRydXN0IEVudGVycHJpc2VzIEx0ZDEmMCQGA1UECwwdVW5pdHRl
c3RpbmcgT3BlcmF0aW9ucyBDZW50ZXIxCzAJBgNVBAYTAkVFMQwwCgYDVQQIDANO
L0ExDjAMBgNVBAcMBUVhcnRoMB4XDTE3MTIzMTIyMDAwMFoXDTQ3MTIzMTIyMDAw
MFowgaQxKjAoBgNVBAMMIUxlbXVyVHJ1c3QgVW5pdHRlc3RzIFJvb3QgQ0EgMjAx
ODEjMCEGA1UECgwaTGVtdXJUcnVzdCBFbnRlcnByaXNlcyBMdGQxJjAkBgNVBAsM
HVVuaXR0ZXN0aW5nIE9wZXJhdGlvbnMgQ2VudGVyMQswCQYDVQQGEwJFRTEMMAoG
A1UECAwDTi9BMQ4wDAYDVQQHDAVFYXJ0aDCCASIwDQYJKoZIhvcNAQEBBQADggEP
ADCCAQoCggEBAL8laXtLXyM64t5dz2B9q+4VvOsChefBi2PlGudqxDuRN3l0Kmcf
un6x2Gng24pTlGdtmiTEWA0a2F8HRLv4YBWhuYleVeBPtf1fF1/SuYgkJOWT7S5q
k/od/tUOLHS0Y067st3FydnFQTKpAuYveEkxleFrMS8hX8cuEgbER+8ybiXKn4Gs
yM/om6lsTyBoaLp5yTAoQb4jAWDbiz1xcjPSkvH2lm7rLGtKoylCYwxRsMh2nZcR
r1OXVhYHXwpYHVB/jVAjy7PAWQ316hi6mpPYbBV+yfn2GUfGuytqyoXLEsrM3iEE
AkU0mJjQmYsCDM3r7ONHTM+UFEk47HCZJccCAwEAAaNCMEAwDwYDVR0TAQH/BAUw
AwEB/zAOBgNVHQ8BAf8EBAMCAQYwHQYDVR0OBBYEFFL12SFeOTTDdGKsHKozeByG
HY6nMA0GCSqGSIb3DQEBCwUAA4IBAQAJfe0/uAHobkxth38dqrSFmTo+D5/TMlRt
3hdgjlah6sD2+/DObCyut/XhQWCgTNWyRi4xTKgLh5KSoeJ9EMkADGEgDkU2vjBg
5FmGZsxg6bqjxehK+2HvASJoTH8r41xmTioav7a2i3wNhaNSntw2QRTQBQEDOIzH
RpPDQ2quErjA8nSifE2xmAAr3g+FuookTTJuv37s2cS59zRYsg+WC3+TtPpRssvo
bJ6Xe2D4cCVjUmsqtFEztMgdqgmlcWyGdUKeXdi7CMoeTb4uO+9qRQq46wYWn7K1
z+W0Kp5yhnnPAoOioAP4vjASDx3z3RnLaZvMmcO7YdCIwhE5oGV0MIIEGjCCAwKg
AwIBAgIRAJ96dbOdrkw/lSTGiwbaagwwDQYJKoZIhvcNAQELBQAwgaQxKjAoBgNV
BAMMIUxlbXVyVHJ1c3QgVW5pdHRlc3RzIFJvb3QgQ0EgMjAxODEjMCEGA1UECgwa
TGVtdXJUcnVzdCBFbnRlcnByaXNlcyBMdGQxJjAkBgNVBAsMHVVuaXR0ZXN0aW5n
IE9wZXJhdGlvbnMgQ2VudGVyMQswCQYDVQQGEwJFRTEMMAoGA1UECAwDTi9BMQ4w
DAYDVQQHDAVFYXJ0aDAeFw0xNzEyMzEyMjAwMDBaFw00NzEyMzEyMjAwMDBaMIGn
MS0wKwYDVQQDDCRMZW11clRydXN0IFVuaXR0ZXN0cyBDbGFzcyAxIENBIDIwMTgx
IzAhBgNVBAoMGkxlbXVyVHJ1c3QgRW50ZXJwcmlzZXMgTHRkMSYwJAYDVQQLDB1V
bml0dGVzdGluZyBPcGVyYXRpb25zIENlbnRlcjELMAkGA1UEBhMCRUUxDDAKBgNV
BAgMA04vQTEOMAwGA1UEBwwFRWFydGgwggEiMA0GCSqGSIb3DQEBAQUAA4IBDwAw
ggEKAoIBAQDR+qNdfNsLhGvgw3IgCQNakL2B9dpQtkVnvAXhdRZqJETm/tHLkGvO
NWTXAwGdoiKv6+0j3I5InUsW+wzUPewcfj+PLNu4mFMq8jH/gPhTElKiAztPRdm8
QKchvrqiaU6uEbia8ClM6uPpIi8StxE1aJRYL03p0WeMJjJPrsl6eSSdpR4qL69G
Td1n5je9OuWAcn5utXXnt/jO4vNeFRjlGp/0n3JmTDd9w4vtAyY9UrdGgo37eBmi
6mXt5J9i//NenhaiOVU81RqxZM2Jt1kkg2WSjcqcIQfBEWp9StG46VmHLaL+9/v2
XAV3tL1VilJGj6PoFMb4gY5MXthfGSiXAgMBAAGjQjBAMA8GA1UdEwEB/wQFMAMB
Af8wDgYDVR0PAQH/BAQDAgEGMB0GA1UdDgQWBBQstpQr0iMBVfv0lODIsMgT9+9o
ezANBgkqhkiG9w0BAQsFAAOCAQEASYQbv1Qwb5zES6Gb5LEhrAcH81ZB2uIpKd3K
i6AS4fLJVymMGkUs0RZjt39Ep4qX1zf0hn82Yh9YwRalrkgu+tzKrp0JgegNe6+g
yFRrJC0SIGA4zc3M02m/n4tdaouU2lp6jhmWruL3g25ZkgbQ8LO2zjpSMtblR2eu
vR2+bI7TepklyG71qx5y6/N8x5PT+hnTlleiZeE/ji9D96MZlpWB4kBihekWmxup
tED22z/tpQtac+hPBNgt8z1uFVEYN2rKEcCE7V6Qk7icS+M4Vb7M3D8kLyWDubs9
Yy3l0EWjOXQXxEhTaKEm4gSuY/j+Y35bBVkA2Fcyuq7msiTgrzCCBBYwggL+oAMC
AQICEGyG1/wKdEaKs3+R3jeWpJswDQYJKoZIhvcNAQELBQAwgaQxKjAoBgNVBAMM
IUxlbXVyVHJ1c3QgVW5pdHRlc3RzIFJvb3QgQ0EgMjAxODEjMCEGA1UECgwaTGVt
dXJUcnVzdCBFbnRlcnByaXNlcyBMdGQxJjAkBgNVBAsMHVVuaXR0ZXN0aW5nIE9w
ZXJhdGlvbnMgQ2VudGVyMQswCQYDVQQGEwJFRTEMMAoGA1UECAwDTi9BMQ4wDAYD
VQQHDAVFYXJ0aDAeFw0xNzEyMzEyMjAwMDBaFw00NzEyMzEyMjAwMDBaMIGkMSow
KAYDVQQDDCFMZW11clRydXN0IFVuaXR0ZXN0cyBSb290IENBIDIwMTgxIzAhBgNV
BAoMGkxlbXVyVHJ1c3QgRW50ZXJwcmlzZXMgTHRkMSYwJAYDVQQLDB1Vbml0dGVz
dGluZyBPcGVyYXRpb25zIENlbnRlcjELMAkGA1UEBhMCRUUxDDAKBgNVBAgMA04v
QTEOMAwGA1UEBwwFRWFydGgwggEiMA0GCSqGSIb3DQEBAQUAA4IBDwAwggEKAoIB
AQC/JWl7S18jOuLeXc9gfavuFbzrAoXnwYtj5RrnasQ7kTd5dCpnH7p+sdhp4NuK
U5RnbZokxFgNGthfB0S7+GAVobmJXlXgT7X9Xxdf0rmIJCTlk+0uapP6Hf7VDix0
tGNOu7LdxcnZxUEyqQLmL3hJMZXhazEvIV/HLhIGxEfvMm4lyp+BrMjP6JupbE8g
aGi6eckwKEG+IwFg24s9cXIz0pLx9pZu6yxrSqMpQmMMUbDIdp2XEa9Tl1YWB18K
WB1Qf41QI8uzwFkN9eoYupqT2GwVfsn59hlHxrsrasqFyxLKzN4hBAJFNJiY0JmL
AgzN6+zjR0zPlBRJOOxwmSXHAgMBAAGjQjBAMA8GA1UdEwEB/wQFMAMBAf8wDgYD
VR0PAQH/BAQDAgEGMB0GA1UdDgQWBBRS9dkhXjk0w3RirByqM3gchh2OpzANBgkq
hkiG9w0BAQsFAAOCAQEACX3tP7gB6G5MbYd/Haq0hZk6Pg+f0zJUbd4XYI5WoerA
9vvwzmwsrrf14UFgoEzVskYuMUyoC4eSkqHifRDJAAxhIA5FNr4wYORZhmbMYOm6
o8XoSvth7wEiaEx/K+NcZk4qGr+2tot8DYWjUp7cNkEU0AUBAziMx0aTw0NqrhK4
wPJ0onxNsZgAK94PhbqKJE0ybr9+7NnEufc0WLIPlgt/k7T6UbLL6Gyel3tg+HAl
Y1JrKrRRM7TIHaoJpXFshnVCnl3YuwjKHk2+LjvvakUKuOsGFp+ytc/ltCqecoZ5
zwKDoqAD+L4wEg8d890Zy2mbzJnDu2HQiMIROaBldKEAMQA=
-----END PKCS7-----
"""
CERT_CHAIN_PKCS7_PEM = CERT_CHAIN_PKCS7_STR.encode('utf-8')