diff --git a/lemur/plugins/lemur_digicert/plugin.py b/lemur/plugins/lemur_digicert/plugin.py index 88ea5b6b..b9508357 100644 --- a/lemur/plugins/lemur_digicert/plugin.py +++ b/lemur/plugins/lemur_digicert/plugin.py @@ -14,21 +14,17 @@ .. moduleauthor:: Kevin Glisson """ import json + import arrow -import requests - import pem -from retrying import retry - -from flask import current_app - +import requests from cryptography import x509 - -from lemur.extensions import metrics +from flask import current_app from lemur.common.utils import validate_conf -from lemur.plugins.bases import IssuerPlugin, SourcePlugin - +from lemur.extensions import metrics from lemur.plugins import lemur_digicert as digicert +from lemur.plugins.bases import IssuerPlugin, SourcePlugin +from retrying import retry def log_status_code(r, *args, **kwargs): @@ -64,24 +60,38 @@ def signature_hash(signing_algorithm): raise Exception("Unsupported signing algorithm.") -def determine_validity_years(end_date): +def determine_validity_years(years): """Given an end date determine how many years into the future that date is. + :param years: + :return: validity in years + """ + default_years = current_app.config.get("DIGICERT_DEFAULT_VALIDITY", 1) + max_years = current_app.config.get("DIGICERT_MAX_VALIDITY", default_years) + + if years > max_years: + return max_years + if years not in [1, 2, 3]: + return default_years + else: + return years + + +def determine_end_date(end_date): + """ + Determine appropriate end date :param end_date: - :return: str validity in years + :return: validity_end """ - now = arrow.utcnow() + default_years = current_app.config.get("DIGICERT_DEFAULT_VALIDITY", 1) + max_validity_end = arrow.utcnow().shift(years=current_app.config.get("DIGICERT_MAX_VALIDITY", default_years)) - if end_date < now.shift(years=+1): - return 1 - elif end_date < now.shift(years=+2): - return 2 - elif end_date < now.shift(years=+3): - return 3 + if not end_date: + end_date = arrow.utcnow().shift(years=default_years) - raise Exception( - "DigiCert issued certificates cannot exceed three" " years in validity" - ) + if end_date > max_validity_end: + end_date = max_validity_end + return end_date def get_additional_names(options): @@ -107,12 +117,6 @@ def map_fields(options, csr): :param csr: :return: dict or valid DigiCert options """ - if not options.get("validity_years"): - if not options.get("validity_end"): - options["validity_years"] = current_app.config.get( - "DIGICERT_DEFAULT_VALIDITY", 1 - ) - data = dict( certificate={ "common_name": options["common_name"], @@ -125,9 +129,11 @@ def map_fields(options, csr): data["certificate"]["dns_names"] = get_additional_names(options) if options.get("validity_years"): - data["validity_years"] = options["validity_years"] + data["validity_years"] = determine_validity_years(options.get("validity_years")) + elif options.get("validity_end"): + data["custom_expiration_date"] = determine_end_date(options.get("validity_end")).format("YYYY-MM-DD") else: - data["custom_expiration_date"] = options["validity_end"].format("YYYY-MM-DD") + data["validity_years"] = determine_validity_years(0) if current_app.config.get("DIGICERT_PRIVATE", False): if "product" in data: @@ -144,18 +150,15 @@ def map_cis_fields(options, csr): :param options: :param csr: - :return: + :return: data """ - if not options.get("validity_years"): - if not options.get("validity_end"): - options["validity_end"] = arrow.utcnow().shift( - years=current_app.config.get("DIGICERT_DEFAULT_VALIDITY", 1) - ) - options["validity_years"] = determine_validity_years(options["validity_end"]) + + if options.get("validity_years"): + validity_end = determine_end_date(arrow.utcnow().shift(years=options["validity_years"])) + elif options.get("validity_end"): + validity_end = determine_end_date(options.get("validity_end")) else: - options["validity_end"] = arrow.utcnow().shift( - years=options["validity_years"] - ) + validity_end = determine_end_date(False) data = { "profile_name": current_app.config.get("DIGICERT_CIS_PROFILE_NAMES", {}).get(options['authority'].name), @@ -164,7 +167,7 @@ def map_cis_fields(options, csr): "csr": csr, "signature_hash": signature_hash(options.get("signing_algorithm")), "validity": { - "valid_to": options["validity_end"].format("YYYY-MM-DDTHH:MM") + "Z" + "valid_to": validity_end.format("YYYY-MM-DDTHH:MM") + "Z" }, "organization": { "name": options["organization"], @@ -173,7 +176,8 @@ def map_cis_fields(options, csr): } # possibility to default to a SIGNING_ALGORITHM for a given profile if current_app.config.get("DIGICERT_CIS_SIGNING_ALGORITHMS", {}).get(options['authority'].name): - data["signature_hash"] = current_app.config.get("DIGICERT_CIS_SIGNING_ALGORITHMS", {}).get(options['authority'].name) + data["signature_hash"] = current_app.config.get("DIGICERT_CIS_SIGNING_ALGORITHMS", {}).get( + options['authority'].name) return data diff --git a/lemur/plugins/lemur_digicert/tests/test_digicert.py b/lemur/plugins/lemur_digicert/tests/test_digicert.py index 77b0a1fa..b9576f51 100644 --- a/lemur/plugins/lemur_digicert/tests/test_digicert.py +++ b/lemur/plugins/lemur_digicert/tests/test_digicert.py @@ -1,117 +1,125 @@ -import pytest -import arrow import json -from unittest.mock import patch - -from freezegun import freeze_time - -from lemur.tests.vectors import CSR_STR +import arrow +import pytest from cryptography import x509 +from freezegun import freeze_time +from lemur.plugins.lemur_digicert import plugin +from lemur.tests.vectors import CSR_STR +from mock import Mock, patch -def test_map_fields_with_validity_end_and_start(app): - from lemur.plugins.lemur_digicert.plugin import map_fields - - names = [u"one.example.com", u"two.example.com", u"three.example.com"] - - options = { - "common_name": "example.com", - "owner": "bob@example.com", - "description": "test certificate", - "extensions": {"sub_alt_names": {"names": [x509.DNSName(x) for x in names]}}, - "validity_end": arrow.get(2017, 5, 7), - "validity_start": arrow.get(2016, 10, 30), - } - - data = map_fields(options, CSR_STR) - - assert data == { - "certificate": { - "csr": CSR_STR, - "common_name": "example.com", - "dns_names": names, - "signature_hash": "sha256", - }, - "organization": {"id": 111111}, - "custom_expiration_date": arrow.get(2017, 5, 7).format("YYYY-MM-DD"), +def config_mock(*args): + values = { + "DIGICERT_ORG_ID": 111111, + "DIGICERT_PRIVATE": False, + "DIGICERT_DEFAULT_SIGNING_ALGORITHM": "sha256", + "DIGICERT_DEFAULT_VALIDITY": 1, + "DIGICERT_MAX_VALIDITY": 2, + "DIGICERT_CIS_PROFILE_NAMES": {"digicert": 'digicert'}, + "DIGICERT_CIS_SIGNING_ALGORITHMS": {"verisign-sha1-intermediary": 'sha1'}, } + return values[args[0]] -def test_map_fields_with_validity_years(app): - from lemur.plugins.lemur_digicert.plugin import map_fields - - names = [u"one.example.com", u"two.example.com", u"three.example.com"] - - options = { - "common_name": "example.com", - "owner": "bob@example.com", - "description": "test certificate", - "extensions": {"sub_alt_names": {"names": [x509.DNSName(x) for x in names]}}, - "validity_years": 2, - "validity_end": arrow.get(2017, 10, 30), - } - - data = map_fields(options, CSR_STR) - - assert data == { - "certificate": { - "csr": CSR_STR, - "common_name": "example.com", - "dns_names": names, - "signature_hash": "sha256", - }, - "organization": {"id": 111111}, - "validity_years": 2, - } +@patch("lemur.plugins.lemur_digicert.plugin.current_app") +def test_determine_validity_years(mock_current_app): + mock_current_app.config.get = Mock(return_value=2) + assert plugin.determine_validity_years(1) == 1 + assert plugin.determine_validity_years(0) == 2 + assert plugin.determine_validity_years(3) == 2 -def test_map_cis_fields(app, authority): - from lemur.plugins.lemur_digicert.plugin import map_cis_fields - - names = [u"one.example.com", u"two.example.com", u"three.example.com"] - - options = { - "common_name": "example.com", - "owner": "bob@example.com", - "description": "test certificate", - "extensions": {"sub_alt_names": {"names": [x509.DNSName(x) for x in names]}}, - "organization": "Example, Inc.", - "organizational_unit": "Example Org", - "validity_end": arrow.get(2017, 5, 7), - "validity_start": arrow.get(2016, 10, 30), - "authority": authority, - } - - data = map_cis_fields(options, CSR_STR) - - assert data == { - "common_name": "example.com", - "csr": CSR_STR, - "additional_dns_names": names, - "signature_hash": "sha256", - "organization": {"name": "Example, Inc.", "units": ["Example Org"]}, - "validity": { - "valid_to": arrow.get(2017, 5, 7).format("YYYY-MM-DDTHH:MM") + "Z" - }, - "profile_name": None, - } - - options = { - "common_name": "example.com", - "owner": "bob@example.com", - "description": "test certificate", - "extensions": {"sub_alt_names": {"names": [x509.DNSName(x) for x in names]}}, - "organization": "Example, Inc.", - "organizational_unit": "Example Org", - "validity_years": 2, - "authority": authority, - } - +@patch("lemur.plugins.lemur_digicert.plugin.current_app") +def test_determine_end_date(mock_current_app): + mock_current_app.config.get = Mock(return_value=2) with freeze_time(time_to_freeze=arrow.get(2016, 11, 3).datetime): - data = map_cis_fields(options, CSR_STR) + assert arrow.get(2018, 11, 3) == plugin.determine_end_date(0) + assert arrow.get(2018, 5, 7) == plugin.determine_end_date(arrow.get(2018, 5, 7)) + assert arrow.get(2018, 11, 3) == plugin.determine_end_date(arrow.get(2020, 5, 7)) - assert data == { + +@patch("lemur.plugins.lemur_digicert.plugin.current_app") +def test_map_fields_with_validity_years(mock_current_app): + mock_current_app.config.get = Mock(side_effect=config_mock) + + with patch('lemur.plugins.lemur_digicert.plugin.signature_hash') as mock_signature_hash: + mock_signature_hash.return_value = "sha256" + + names = [u"one.example.com", u"two.example.com", u"three.example.com"] + options = { + "common_name": "example.com", + "owner": "bob@example.com", + "description": "test certificate", + "extensions": {"sub_alt_names": {"names": [x509.DNSName(x) for x in names]}}, + "validity_years": 2 + } + expected = { + "certificate": { + "csr": CSR_STR, + "common_name": "example.com", + "dns_names": names, + "signature_hash": "sha256", + }, + "organization": {"id": 111111}, + "validity_years": 2, + } + assert expected == plugin.map_fields(options, CSR_STR) + + +@patch("lemur.plugins.lemur_digicert.plugin.current_app") +def test_map_fields_with_validity_end_and_start(mock_current_app): + mock_current_app.config.get = Mock(side_effect=config_mock) + plugin.determine_end_date = Mock(return_value=arrow.get(2017, 5, 7)) + + with patch('lemur.plugins.lemur_digicert.plugin.signature_hash') as mock_signature_hash: + mock_signature_hash.return_value = "sha256" + + names = [u"one.example.com", u"two.example.com", u"three.example.com"] + options = { + "common_name": "example.com", + "owner": "bob@example.com", + "description": "test certificate", + "extensions": {"sub_alt_names": {"names": [x509.DNSName(x) for x in names]}}, + "validity_end": arrow.get(2017, 5, 7), + "validity_start": arrow.get(2016, 10, 30), + } + + expected = { + "certificate": { + "csr": CSR_STR, + "common_name": "example.com", + "dns_names": names, + "signature_hash": "sha256", + }, + "organization": {"id": 111111}, + "custom_expiration_date": arrow.get(2017, 5, 7).format("YYYY-MM-DD"), + } + + assert expected == plugin.map_fields(options, CSR_STR) + + +@patch("lemur.plugins.lemur_digicert.plugin.current_app") +def test_map_cis_fields_with_validity_years(mock_current_app, authority): + mock_current_app.config.get = Mock(side_effect=config_mock) + plugin.determine_end_date = Mock(return_value=arrow.get(2018, 11, 3)) + + with patch('lemur.plugins.lemur_digicert.plugin.signature_hash') as mock_signature_hash: + mock_signature_hash.return_value = "sha256" + + names = [u"one.example.com", u"two.example.com", u"three.example.com"] + options = { + "common_name": "example.com", + "owner": "bob@example.com", + "description": "test certificate", + "extensions": {"sub_alt_names": {"names": [x509.DNSName(x) for x in names]}}, + "organization": "Example, Inc.", + "organizational_unit": "Example Org", + "validity_years": 2, + "authority": authority, + } + + expected = { "common_name": "example.com", "csr": CSR_STR, "additional_dns_names": names, @@ -123,21 +131,59 @@ def test_map_cis_fields(app, authority): "profile_name": None, } + assert expected == plugin.map_cis_fields(options, CSR_STR) -def test_signature_hash(app): - from lemur.plugins.lemur_digicert.plugin import signature_hash - assert signature_hash(None) == "sha256" - assert signature_hash("sha256WithRSA") == "sha256" - assert signature_hash("sha384WithRSA") == "sha384" - assert signature_hash("sha512WithRSA") == "sha512" +@patch("lemur.plugins.lemur_digicert.plugin.current_app") +def test_map_cis_fields_with_validity_end_and_start(mock_current_app, app, authority): + mock_current_app.config.get = Mock(side_effect=config_mock) + plugin.determine_end_date = Mock(return_value=arrow.get(2017, 5, 7)) + + with patch('lemur.plugins.lemur_digicert.plugin.signature_hash') as mock_signature_hash: + mock_signature_hash.return_value = "sha256" + + names = [u"one.example.com", u"two.example.com", u"three.example.com"] + options = { + "common_name": "example.com", + "owner": "bob@example.com", + "description": "test certificate", + "extensions": {"sub_alt_names": {"names": [x509.DNSName(x) for x in names]}}, + "organization": "Example, Inc.", + "organizational_unit": "Example Org", + "validity_end": arrow.get(2017, 5, 7), + "validity_start": arrow.get(2016, 10, 30), + "authority": authority + } + + expected = { + "common_name": "example.com", + "csr": CSR_STR, + "additional_dns_names": names, + "signature_hash": "sha256", + "organization": {"name": "Example, Inc.", "units": ["Example Org"]}, + "validity": { + "valid_to": arrow.get(2017, 5, 7).format("YYYY-MM-DDTHH:MM") + "Z" + }, + "profile_name": None, + } + + assert expected == plugin.map_cis_fields(options, CSR_STR) + + +@patch("lemur.plugins.lemur_digicert.plugin.current_app") +def test_signature_hash(mock_current_app, app): + mock_current_app.config.get = Mock(side_effect=config_mock) + assert plugin.signature_hash(None) == "sha256" + assert plugin.signature_hash("sha256WithRSA") == "sha256" + assert plugin.signature_hash("sha384WithRSA") == "sha384" + assert plugin.signature_hash("sha512WithRSA") == "sha512" with pytest.raises(Exception): - signature_hash("sdfdsf") + plugin.signature_hash("sdfdsf") def test_issuer_plugin_create_certificate( - certificate_="""\ + certificate_="""\ -----BEGIN CERTIFICATE----- abc -----END CERTIFICATE-----