Merge pull request #2925 from Netflix/renewal_validity_01

adding default/max DigiCert validity dates
This commit is contained in:
csine-nflx 2020-03-03 17:36:46 -08:00 committed by GitHub
commit 38a7f3d43e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 201 additions and 152 deletions

View File

@ -14,21 +14,17 @@
.. moduleauthor:: Kevin Glisson <kglisson@netflix.com> .. moduleauthor:: Kevin Glisson <kglisson@netflix.com>
""" """
import json import json
import arrow import arrow
import requests
import pem import pem
from retrying import retry import requests
from flask import current_app
from cryptography import x509 from cryptography import x509
from flask import current_app
from lemur.extensions import metrics
from lemur.common.utils import validate_conf from lemur.common.utils import validate_conf
from lemur.plugins.bases import IssuerPlugin, SourcePlugin from lemur.extensions import metrics
from lemur.plugins import lemur_digicert as digicert from lemur.plugins import lemur_digicert as digicert
from lemur.plugins.bases import IssuerPlugin, SourcePlugin
from retrying import retry
def log_status_code(r, *args, **kwargs): def log_status_code(r, *args, **kwargs):
@ -64,24 +60,37 @@ def signature_hash(signing_algorithm):
raise Exception("Unsupported signing algorithm.") raise Exception("Unsupported signing algorithm.")
def determine_validity_years(end_date): def determine_validity_years(years):
"""Given an end date determine how many years into the future that date is. """Given an end date determine how many years into the future that date is.
:param years:
:return: validity in years
"""
default_years = current_app.config.get("DIGICERT_DEFAULT_VALIDITY", 1)
max_years = current_app.config.get("DIGICERT_MAX_VALIDITY", default_years)
if years > max_years:
return max_years
if years not in [1, 2, 3]:
return default_years
return years
def determine_end_date(end_date):
"""
Determine appropriate end date
:param end_date: :param end_date:
:return: str validity in years :return: validity_end
""" """
now = arrow.utcnow() default_years = current_app.config.get("DIGICERT_DEFAULT_VALIDITY", 1)
max_validity_end = arrow.utcnow().shift(years=current_app.config.get("DIGICERT_MAX_VALIDITY", default_years))
if end_date < now.shift(years=+1): if not end_date:
return 1 end_date = arrow.utcnow().shift(years=default_years)
elif end_date < now.shift(years=+2):
return 2
elif end_date < now.shift(years=+3):
return 3
raise Exception( if end_date > max_validity_end:
"DigiCert issued certificates cannot exceed three" " years in validity" end_date = max_validity_end
) return end_date
def get_additional_names(options): def get_additional_names(options):
@ -107,12 +116,6 @@ def map_fields(options, csr):
:param csr: :param csr:
:return: dict or valid DigiCert options :return: dict or valid DigiCert options
""" """
if not options.get("validity_years"):
if not options.get("validity_end"):
options["validity_years"] = current_app.config.get(
"DIGICERT_DEFAULT_VALIDITY", 1
)
data = dict( data = dict(
certificate={ certificate={
"common_name": options["common_name"], "common_name": options["common_name"],
@ -125,9 +128,11 @@ def map_fields(options, csr):
data["certificate"]["dns_names"] = get_additional_names(options) data["certificate"]["dns_names"] = get_additional_names(options)
if options.get("validity_years"): if options.get("validity_years"):
data["validity_years"] = options["validity_years"] data["validity_years"] = determine_validity_years(options.get("validity_years"))
elif options.get("validity_end"):
data["custom_expiration_date"] = determine_end_date(options.get("validity_end")).format("YYYY-MM-DD")
else: else:
data["custom_expiration_date"] = options["validity_end"].format("YYYY-MM-DD") data["validity_years"] = determine_validity_years(0)
if current_app.config.get("DIGICERT_PRIVATE", False): if current_app.config.get("DIGICERT_PRIVATE", False):
if "product" in data: if "product" in data:
@ -144,18 +149,15 @@ def map_cis_fields(options, csr):
:param options: :param options:
:param csr: :param csr:
:return: :return: data
""" """
if not options.get("validity_years"):
if not options.get("validity_end"): if options.get("validity_years"):
options["validity_end"] = arrow.utcnow().shift( validity_end = determine_end_date(arrow.utcnow().shift(years=options["validity_years"]))
years=current_app.config.get("DIGICERT_DEFAULT_VALIDITY", 1) elif options.get("validity_end"):
) validity_end = determine_end_date(options.get("validity_end"))
options["validity_years"] = determine_validity_years(options["validity_end"])
else: else:
options["validity_end"] = arrow.utcnow().shift( validity_end = determine_end_date(False)
years=options["validity_years"]
)
data = { data = {
"profile_name": current_app.config.get("DIGICERT_CIS_PROFILE_NAMES", {}).get(options['authority'].name), "profile_name": current_app.config.get("DIGICERT_CIS_PROFILE_NAMES", {}).get(options['authority'].name),
@ -164,7 +166,7 @@ def map_cis_fields(options, csr):
"csr": csr, "csr": csr,
"signature_hash": signature_hash(options.get("signing_algorithm")), "signature_hash": signature_hash(options.get("signing_algorithm")),
"validity": { "validity": {
"valid_to": options["validity_end"].format("YYYY-MM-DDTHH:MM") + "Z" "valid_to": validity_end.format("YYYY-MM-DDTHH:MM") + "Z"
}, },
"organization": { "organization": {
"name": options["organization"], "name": options["organization"],
@ -173,7 +175,8 @@ def map_cis_fields(options, csr):
} }
# possibility to default to a SIGNING_ALGORITHM for a given profile # possibility to default to a SIGNING_ALGORITHM for a given profile
if current_app.config.get("DIGICERT_CIS_SIGNING_ALGORITHMS", {}).get(options['authority'].name): if current_app.config.get("DIGICERT_CIS_SIGNING_ALGORITHMS", {}).get(options['authority'].name):
data["signature_hash"] = current_app.config.get("DIGICERT_CIS_SIGNING_ALGORITHMS", {}).get(options['authority'].name) data["signature_hash"] = current_app.config.get("DIGICERT_CIS_SIGNING_ALGORITHMS", {}).get(
options['authority'].name)
return data return data

View File

@ -1,117 +1,125 @@
import pytest
import arrow
import json import json
from unittest.mock import patch
from freezegun import freeze_time
from lemur.tests.vectors import CSR_STR
import arrow
import pytest
from cryptography import x509 from cryptography import x509
from freezegun import freeze_time
from lemur.plugins.lemur_digicert import plugin
from lemur.tests.vectors import CSR_STR
from mock import Mock, patch
def test_map_fields_with_validity_end_and_start(app): def config_mock(*args):
from lemur.plugins.lemur_digicert.plugin import map_fields values = {
"DIGICERT_ORG_ID": 111111,
names = [u"one.example.com", u"two.example.com", u"three.example.com"] "DIGICERT_PRIVATE": False,
"DIGICERT_DEFAULT_SIGNING_ALGORITHM": "sha256",
options = { "DIGICERT_DEFAULT_VALIDITY": 1,
"common_name": "example.com", "DIGICERT_MAX_VALIDITY": 2,
"owner": "bob@example.com", "DIGICERT_CIS_PROFILE_NAMES": {"digicert": 'digicert'},
"description": "test certificate", "DIGICERT_CIS_SIGNING_ALGORITHMS": {"digicert": 'digicert'},
"extensions": {"sub_alt_names": {"names": [x509.DNSName(x) for x in names]}},
"validity_end": arrow.get(2017, 5, 7),
"validity_start": arrow.get(2016, 10, 30),
}
data = map_fields(options, CSR_STR)
assert data == {
"certificate": {
"csr": CSR_STR,
"common_name": "example.com",
"dns_names": names,
"signature_hash": "sha256",
},
"organization": {"id": 111111},
"custom_expiration_date": arrow.get(2017, 5, 7).format("YYYY-MM-DD"),
} }
return values[args[0]]
def test_map_fields_with_validity_years(app): @patch("lemur.plugins.lemur_digicert.plugin.current_app")
from lemur.plugins.lemur_digicert.plugin import map_fields def test_determine_validity_years(mock_current_app):
mock_current_app.config.get = Mock(return_value=2)
names = [u"one.example.com", u"two.example.com", u"three.example.com"] assert plugin.determine_validity_years(1) == 1
assert plugin.determine_validity_years(0) == 2
options = { assert plugin.determine_validity_years(3) == 2
"common_name": "example.com",
"owner": "bob@example.com",
"description": "test certificate",
"extensions": {"sub_alt_names": {"names": [x509.DNSName(x) for x in names]}},
"validity_years": 2,
"validity_end": arrow.get(2017, 10, 30),
}
data = map_fields(options, CSR_STR)
assert data == {
"certificate": {
"csr": CSR_STR,
"common_name": "example.com",
"dns_names": names,
"signature_hash": "sha256",
},
"organization": {"id": 111111},
"validity_years": 2,
}
def test_map_cis_fields(app, authority): @patch("lemur.plugins.lemur_digicert.plugin.current_app")
from lemur.plugins.lemur_digicert.plugin import map_cis_fields def test_determine_end_date(mock_current_app):
mock_current_app.config.get = Mock(return_value=2)
names = [u"one.example.com", u"two.example.com", u"three.example.com"]
options = {
"common_name": "example.com",
"owner": "bob@example.com",
"description": "test certificate",
"extensions": {"sub_alt_names": {"names": [x509.DNSName(x) for x in names]}},
"organization": "Example, Inc.",
"organizational_unit": "Example Org",
"validity_end": arrow.get(2017, 5, 7),
"validity_start": arrow.get(2016, 10, 30),
"authority": authority,
}
data = map_cis_fields(options, CSR_STR)
assert data == {
"common_name": "example.com",
"csr": CSR_STR,
"additional_dns_names": names,
"signature_hash": "sha256",
"organization": {"name": "Example, Inc.", "units": ["Example Org"]},
"validity": {
"valid_to": arrow.get(2017, 5, 7).format("YYYY-MM-DDTHH:MM") + "Z"
},
"profile_name": None,
}
options = {
"common_name": "example.com",
"owner": "bob@example.com",
"description": "test certificate",
"extensions": {"sub_alt_names": {"names": [x509.DNSName(x) for x in names]}},
"organization": "Example, Inc.",
"organizational_unit": "Example Org",
"validity_years": 2,
"authority": authority,
}
with freeze_time(time_to_freeze=arrow.get(2016, 11, 3).datetime): with freeze_time(time_to_freeze=arrow.get(2016, 11, 3).datetime):
data = map_cis_fields(options, CSR_STR) assert arrow.get(2018, 11, 3) == plugin.determine_end_date(0)
assert arrow.get(2018, 5, 7) == plugin.determine_end_date(arrow.get(2018, 5, 7))
assert arrow.get(2018, 11, 3) == plugin.determine_end_date(arrow.get(2020, 5, 7))
assert data == {
@patch("lemur.plugins.lemur_digicert.plugin.current_app")
def test_map_fields_with_validity_years(mock_current_app):
mock_current_app.config.get = Mock(side_effect=config_mock)
with patch('lemur.plugins.lemur_digicert.plugin.signature_hash') as mock_signature_hash:
mock_signature_hash.return_value = "sha256"
names = [u"one.example.com", u"two.example.com", u"three.example.com"]
options = {
"common_name": "example.com",
"owner": "bob@example.com",
"description": "test certificate",
"extensions": {"sub_alt_names": {"names": [x509.DNSName(x) for x in names]}},
"validity_years": 2
}
expected = {
"certificate": {
"csr": CSR_STR,
"common_name": "example.com",
"dns_names": names,
"signature_hash": "sha256",
},
"organization": {"id": 111111},
"validity_years": 2,
}
assert expected == plugin.map_fields(options, CSR_STR)
@patch("lemur.plugins.lemur_digicert.plugin.current_app")
def test_map_fields_with_validity_end_and_start(mock_current_app):
mock_current_app.config.get = Mock(side_effect=config_mock)
plugin.determine_end_date = Mock(return_value=arrow.get(2017, 5, 7))
with patch('lemur.plugins.lemur_digicert.plugin.signature_hash') as mock_signature_hash:
mock_signature_hash.return_value = "sha256"
names = [u"one.example.com", u"two.example.com", u"three.example.com"]
options = {
"common_name": "example.com",
"owner": "bob@example.com",
"description": "test certificate",
"extensions": {"sub_alt_names": {"names": [x509.DNSName(x) for x in names]}},
"validity_end": arrow.get(2017, 5, 7),
"validity_start": arrow.get(2016, 10, 30),
}
expected = {
"certificate": {
"csr": CSR_STR,
"common_name": "example.com",
"dns_names": names,
"signature_hash": "sha256",
},
"organization": {"id": 111111},
"custom_expiration_date": arrow.get(2017, 5, 7).format("YYYY-MM-DD"),
}
assert expected == plugin.map_fields(options, CSR_STR)
@patch("lemur.plugins.lemur_digicert.plugin.current_app")
def test_map_cis_fields_with_validity_years(mock_current_app, authority):
mock_current_app.config.get = Mock(side_effect=config_mock)
plugin.determine_end_date = Mock(return_value=arrow.get(2018, 11, 3))
with patch('lemur.plugins.lemur_digicert.plugin.signature_hash') as mock_signature_hash:
mock_signature_hash.return_value = "sha256"
names = [u"one.example.com", u"two.example.com", u"three.example.com"]
options = {
"common_name": "example.com",
"owner": "bob@example.com",
"description": "test certificate",
"extensions": {"sub_alt_names": {"names": [x509.DNSName(x) for x in names]}},
"organization": "Example, Inc.",
"organizational_unit": "Example Org",
"validity_years": 2,
"authority": authority,
}
expected = {
"common_name": "example.com", "common_name": "example.com",
"csr": CSR_STR, "csr": CSR_STR,
"additional_dns_names": names, "additional_dns_names": names,
@ -123,21 +131,59 @@ def test_map_cis_fields(app, authority):
"profile_name": None, "profile_name": None,
} }
assert expected == plugin.map_cis_fields(options, CSR_STR)
def test_signature_hash(app):
from lemur.plugins.lemur_digicert.plugin import signature_hash
assert signature_hash(None) == "sha256" @patch("lemur.plugins.lemur_digicert.plugin.current_app")
assert signature_hash("sha256WithRSA") == "sha256" def test_map_cis_fields_with_validity_end_and_start(mock_current_app, app, authority):
assert signature_hash("sha384WithRSA") == "sha384" mock_current_app.config.get = Mock(side_effect=config_mock)
assert signature_hash("sha512WithRSA") == "sha512" plugin.determine_end_date = Mock(return_value=arrow.get(2017, 5, 7))
with patch('lemur.plugins.lemur_digicert.plugin.signature_hash') as mock_signature_hash:
mock_signature_hash.return_value = "sha256"
names = [u"one.example.com", u"two.example.com", u"three.example.com"]
options = {
"common_name": "example.com",
"owner": "bob@example.com",
"description": "test certificate",
"extensions": {"sub_alt_names": {"names": [x509.DNSName(x) for x in names]}},
"organization": "Example, Inc.",
"organizational_unit": "Example Org",
"validity_end": arrow.get(2017, 5, 7),
"validity_start": arrow.get(2016, 10, 30),
"authority": authority
}
expected = {
"common_name": "example.com",
"csr": CSR_STR,
"additional_dns_names": names,
"signature_hash": "sha256",
"organization": {"name": "Example, Inc.", "units": ["Example Org"]},
"validity": {
"valid_to": arrow.get(2017, 5, 7).format("YYYY-MM-DDTHH:MM") + "Z"
},
"profile_name": None,
}
assert expected == plugin.map_cis_fields(options, CSR_STR)
@patch("lemur.plugins.lemur_digicert.plugin.current_app")
def test_signature_hash(mock_current_app, app):
mock_current_app.config.get = Mock(side_effect=config_mock)
assert plugin.signature_hash(None) == "sha256"
assert plugin.signature_hash("sha256WithRSA") == "sha256"
assert plugin.signature_hash("sha384WithRSA") == "sha384"
assert plugin.signature_hash("sha512WithRSA") == "sha512"
with pytest.raises(Exception): with pytest.raises(Exception):
signature_hash("sdfdsf") plugin.signature_hash("sdfdsf")
def test_issuer_plugin_create_certificate( def test_issuer_plugin_create_certificate(
certificate_="""\ certificate_="""\
-----BEGIN CERTIFICATE----- -----BEGIN CERTIFICATE-----
abc abc
-----END CERTIFICATE----- -----END CERTIFICATE-----