commit
cd29b2b870
|
@ -6,6 +6,8 @@
|
|||
:license: Apache, see LICENSE for more details.
|
||||
.. moduleauthor:: Kevin Glisson <kglisson@netflix.com>
|
||||
"""
|
||||
import json
|
||||
|
||||
from sqlalchemy.orm import relationship
|
||||
from sqlalchemy import (
|
||||
Column,
|
||||
|
@ -80,5 +82,21 @@ class Authority(db.Model):
|
|||
def plugin(self):
|
||||
return plugins.get(self.plugin_name)
|
||||
|
||||
@property
|
||||
def is_cab_compliant(self):
|
||||
"""
|
||||
Parse the options to find whether authority is CAB Forum Compliant,
|
||||
i.e., adhering to the CA/Browser Forum Baseline Requirements.
|
||||
Returns None if option is not available
|
||||
"""
|
||||
if not self.options:
|
||||
return None
|
||||
|
||||
for option in json.loads(self.options):
|
||||
if "name" in option and option["name"] == 'cab_compliant':
|
||||
return option["value"]
|
||||
|
||||
return None
|
||||
|
||||
def __repr__(self):
|
||||
return "Authority(name={name})".format(name=self.name)
|
||||
|
|
|
@ -139,6 +139,7 @@ class AuthorityNestedOutputSchema(LemurOutputSchema):
|
|||
plugin = fields.Nested(PluginOutputSchema)
|
||||
active = fields.Boolean()
|
||||
authority_certificate = fields.Nested(RootAuthorityCertificateOutputSchema, only=["max_issuance_days", "default_validity_days"])
|
||||
is_cab_compliant = fields.Boolean()
|
||||
|
||||
|
||||
authority_update_schema = AuthorityUpdateSchema()
|
||||
|
|
|
@ -8,7 +8,7 @@
|
|||
from flask import current_app
|
||||
from flask_restful import inputs
|
||||
from flask_restful.reqparse import RequestParser
|
||||
from marshmallow import fields, validate, validates_schema, post_load, pre_load
|
||||
from marshmallow import fields, validate, validates_schema, post_load, pre_load, post_dump
|
||||
from marshmallow.exceptions import ValidationError
|
||||
|
||||
from lemur.authorities.schemas import AuthorityNestedOutputSchema
|
||||
|
@ -332,6 +332,27 @@ class CertificateOutputSchema(LemurOutputSchema):
|
|||
)
|
||||
rotation_policy = fields.Nested(RotationPolicyNestedOutputSchema)
|
||||
|
||||
country = fields.String()
|
||||
location = fields.String()
|
||||
state = fields.String()
|
||||
organization = fields.String()
|
||||
organizational_unit = fields.String()
|
||||
|
||||
@post_dump
|
||||
def handle_subject_details(self, data):
|
||||
# Remove subject details if authority is CA/Browser Forum compliant. The code will use default set of values in that case.
|
||||
# If CA/Browser Forum compliance of an authority is unknown (None), it is safe to fallback to default values. Thus below
|
||||
# condition checks for 'not False' ==> 'True or None'
|
||||
if data.get("authority"):
|
||||
is_cab_compliant = data.get("authority").get("isCabCompliant")
|
||||
|
||||
if is_cab_compliant is not False:
|
||||
data.pop("country", None)
|
||||
data.pop("state", None)
|
||||
data.pop("location", None)
|
||||
data.pop("organization", None)
|
||||
data.pop("organizational_unit", None)
|
||||
|
||||
|
||||
class CertificateShortOutputSchema(LemurOutputSchema):
|
||||
id = fields.Integer()
|
||||
|
|
|
@ -175,7 +175,6 @@ def map_cis_fields(options, csr):
|
|||
},
|
||||
"organization": {
|
||||
"name": options["organization"],
|
||||
"units": [options["organizational_unit"]],
|
||||
},
|
||||
}
|
||||
# possibility to default to a SIGNING_ALGORITHM for a given profile
|
||||
|
|
|
@ -121,7 +121,7 @@ def test_map_cis_fields_with_validity_years(mock_current_app, authority):
|
|||
"csr": CSR_STR,
|
||||
"additional_dns_names": names,
|
||||
"signature_hash": "sha256",
|
||||
"organization": {"name": "Example, Inc.", "units": ["Example Org"]},
|
||||
"organization": {"name": "Example, Inc."},
|
||||
"validity": {
|
||||
"valid_to": arrow.get(2018, 11, 3).format("YYYY-MM-DDTHH:MM") + "Z"
|
||||
},
|
||||
|
@ -157,7 +157,7 @@ def test_map_cis_fields_with_validity_end_and_start(mock_current_app, app, autho
|
|||
"csr": CSR_STR,
|
||||
"additional_dns_names": names,
|
||||
"signature_hash": "sha256",
|
||||
"organization": {"name": "Example, Inc.", "units": ["Example Org"]},
|
||||
"organization": {"name": "Example, Inc."},
|
||||
"validity": {
|
||||
"valid_to": arrow.get(2017, 5, 7).format("YYYY-MM-DDTHH:MM") + "Z"
|
||||
},
|
||||
|
|
|
@ -255,9 +255,6 @@ angular.module('lemur')
|
|||
$scope.certificate.replacedBy = []; // should not clone 'replaced by' info
|
||||
$scope.certificate.removeReplaces(); // should not clone 'replacement cert' info
|
||||
|
||||
if(!$scope.certificate.keyType) {
|
||||
$scope.certificate.keyType = 'RSA2048'; // default algo to select during clone if backend did not return algo
|
||||
}
|
||||
CertificateService.getDefaults($scope.certificate);
|
||||
});
|
||||
|
||||
|
|
|
@ -289,6 +289,11 @@ angular.module('lemur')
|
|||
if (certificate.dnsProviderId) {
|
||||
certificate.dnsProvider = {id: certificate.dnsProviderId};
|
||||
}
|
||||
|
||||
if(!certificate.keyType) {
|
||||
certificate.keyType = 'RSA2048'; // default algo to select during clone if backend did not return algo
|
||||
}
|
||||
|
||||
});
|
||||
};
|
||||
|
||||
|
|
|
@ -9,7 +9,6 @@ from cryptography import x509
|
|||
from cryptography.hazmat.backends import default_backend
|
||||
from marshmallow import ValidationError
|
||||
from freezegun import freeze_time
|
||||
# from mock import patch
|
||||
from unittest.mock import patch
|
||||
|
||||
from lemur.certificates.service import create_csr
|
||||
|
@ -171,10 +170,43 @@ def test_certificate_output_schema(session, certificate, issuer_plugin):
|
|||
) as wrapper:
|
||||
data, errors = CertificateOutputSchema().dump(certificate)
|
||||
assert data["issuer"] == "LemurTrustUnittestsClass1CA2018"
|
||||
assert data["distinguishedName"] == "L=Earth,ST=N/A,C=EE,OU=Karate Lessons,O=Daniel San & co,CN=san.example.org"
|
||||
# Authority does not have 'cab_compliant', thus subject details should not be returned
|
||||
assert "organization" not in data
|
||||
|
||||
assert wrapper.call_count == 1
|
||||
|
||||
|
||||
def test_certificate_output_schema_subject_details(session, certificate, issuer_plugin):
|
||||
from lemur.certificates.schemas import CertificateOutputSchema
|
||||
from lemur.authorities.service import update_options
|
||||
|
||||
# Mark authority as non-cab-compliant
|
||||
update_options(certificate.authority.id, '[{"name": "cab_compliant","value":false}]')
|
||||
|
||||
data, errors = CertificateOutputSchema().dump(certificate)
|
||||
assert not errors
|
||||
assert data["issuer"] == "LemurTrustUnittestsClass1CA2018"
|
||||
assert data["distinguishedName"] == "L=Earth,ST=N/A,C=EE,OU=Karate Lessons,O=Daniel San & co,CN=san.example.org"
|
||||
|
||||
# Original subject details should be returned because of cab_compliant option update above
|
||||
assert data["country"] == "EE"
|
||||
assert data["state"] == "N/A"
|
||||
assert data["location"] == "Earth"
|
||||
assert data["organization"] == "Daniel San & co"
|
||||
assert data["organizationalUnit"] == "Karate Lessons"
|
||||
|
||||
# Mark authority as cab-compliant
|
||||
update_options(certificate.authority.id, '[{"name": "cab_compliant","value":true}]')
|
||||
data, errors = CertificateOutputSchema().dump(certificate)
|
||||
assert not errors
|
||||
assert "country" not in data
|
||||
assert "state" not in data
|
||||
assert "location" not in data
|
||||
assert "organization" not in data
|
||||
assert "organizationalUnit" not in data
|
||||
|
||||
|
||||
def test_certificate_edit_schema(session):
|
||||
from lemur.certificates.schemas import CertificateEditInputSchema
|
||||
|
||||
|
@ -759,12 +791,22 @@ def test_reissue_certificate(
|
|||
issuer_plugin, crypto_authority, certificate, logged_in_user
|
||||
):
|
||||
from lemur.certificates.service import reissue_certificate
|
||||
from lemur.authorities.service import update_options
|
||||
from lemur.tests.conf import LEMUR_DEFAULT_ORGANIZATION
|
||||
|
||||
# test-authority would return a mismatching private key, so use 'cryptography-issuer' plugin instead.
|
||||
certificate.authority = crypto_authority
|
||||
new_cert = reissue_certificate(certificate)
|
||||
assert new_cert
|
||||
assert (new_cert.key_type == "RSA2048")
|
||||
assert new_cert.key_type == "RSA2048"
|
||||
assert new_cert.organization != certificate.organization
|
||||
# Check for default value since authority does not have cab_compliant option set
|
||||
assert new_cert.organization == LEMUR_DEFAULT_ORGANIZATION
|
||||
|
||||
# update cab_compliant option to false for crypto_authority to maintain subject details
|
||||
update_options(crypto_authority.id, '[{"name": "cab_compliant","value":false}]')
|
||||
new_cert = reissue_certificate(certificate)
|
||||
assert new_cert.organization == certificate.organization
|
||||
|
||||
|
||||
def test_create_csr():
|
||||
|
@ -921,6 +963,14 @@ def test_certificate_get_body(client):
|
|||
"CN=LemurTrust Unittests Class 1 CA 2018"
|
||||
)
|
||||
|
||||
# No authority details are provided in this test, no information about being cab_compliant is available.
|
||||
# Thus original subject details should be returned.
|
||||
assert response_body["country"] == "EE"
|
||||
assert response_body["state"] == "N/A"
|
||||
assert response_body["location"] == "Earth"
|
||||
assert response_body["organization"] == "LemurTrust Enterprises Ltd"
|
||||
assert response_body["organizationalUnit"] == "Unittesting Operations Center"
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"token,status",
|
||||
|
|
Loading…
Reference in New Issue