Merge pull request #3143 from hosseinsh/entrust-revised

Entrust plugin revised
This commit is contained in:
Hossein Shafagh 2020-09-23 14:53:18 -07:00 committed by GitHub
commit df8d6e5d7f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 164 additions and 35 deletions

View File

@ -1,9 +1,12 @@
from lemur.plugins.bases import IssuerPlugin, SourcePlugin
import arrow import arrow
import requests import requests
import json import json
from lemur.plugins import lemur_entrust as ENTRUST import sys
from flask import current_app from flask import current_app
from lemur.plugins import lemur_entrust as entrust
from lemur.plugins.bases import IssuerPlugin, SourcePlugin
from lemur.extensions import metrics from lemur.extensions import metrics
from lemur.common.utils import validate_conf from lemur.common.utils import validate_conf
@ -17,24 +20,24 @@ def log_status_code(r, *args, **kwargs):
:param kwargs: :param kwargs:
:return: :return:
""" """
metrics.send("ENTRUST_status_code_{}".format(r.status_code), "counter", 1) metrics.send(f"entrust_status_code_{r.status_code}", "counter", 1)
def determine_end_date(end_date): def determine_end_date(end_date):
""" """
Determine appropriate end date Determine appropriate end date
:param end_date: :param end_date:
:return: validity_end :return: validity_end as string
""" """
# ENTRUST only allows 13 months of max certificate duration # ENTRUST only allows 13 months of max certificate duration
max_validity_end = arrow.utcnow().shift(years=1, months=+1).format('YYYY-MM-DD') max_validity_end = arrow.utcnow().shift(years=1, months=+1)
if not end_date: if not end_date:
end_date = max_validity_end end_date = max_validity_end
if end_date > max_validity_end: if end_date > max_validity_end:
end_date = max_validity_end end_date = max_validity_end
return end_date return end_date.format('YYYY-MM-DD')
def process_options(options): def process_options(options):
@ -49,7 +52,10 @@ def process_options(options):
# take the value as Cert product-type # take the value as Cert product-type
# else default to "STANDARD_SSL" # else default to "STANDARD_SSL"
authority = options.get("authority").name.upper() authority = options.get("authority").name.upper()
product_type = current_app.config.get("ENTRUST_PRODUCT_{0}".format(authority), "STANDARD_SSL") # STANDARD_SSL (cn=domain, san=www.domain),
# ADVANTAGE_SSL (cn=domain, san=[www.domain, one_more_option]),
# WILDCARD_SSL (unlimited sans, and wildcard)
product_type = current_app.config.get(f"ENTRUST_PRODUCT_{authority}", "STANDARD_SSL")
if options.get("validity_end"): if options.get("validity_end"):
validity_end = determine_end_date(options.get("validity_end")) validity_end = determine_end_date(options.get("validity_end"))
@ -67,6 +73,7 @@ def process_options(options):
"eku": "SERVER_AND_CLIENT_AUTH", "eku": "SERVER_AND_CLIENT_AUTH",
"certType": product_type, "certType": product_type,
"certExpiryDate": validity_end, "certExpiryDate": validity_end,
# "keyType": "RSA", Entrust complaining about this parameter
"tracking": tracking_data "tracking": tracking_data
} }
return data return data
@ -86,23 +93,31 @@ def handle_response(my_response):
404: "Unknown jobId", 404: "Unknown jobId",
429: "Too many requests" 429: "Too many requests"
} }
try: try:
d = json.loads(my_response.content) d = json.loads(my_response.content)
except Exception as e: except ValueError:
# catch an empty jason object here # catch an empty jason object here
d = {'errors': 'No detailled message'} d = {'response': 'No detailed message'}
s = my_response.status_code s = my_response.status_code
if s > 399: if s > 399:
raise Exception("ENTRUST error: {0}\n{1}".format(msg.get(s, s), d['errors'])) raise Exception(f"ENTRUST error: {msg.get(s, s)}\n{d['errors']}")
current_app.logger.info("Response: {0}, {1} ".format(s, d))
log_data = {
"function": f"{__name__}.{sys._getframe().f_code.co_name}",
"message": "Response",
"status": s,
"response": d
}
current_app.logger.info(log_data)
return d return d
class EntrustIssuerPlugin(IssuerPlugin): class EntrustIssuerPlugin(IssuerPlugin):
title = "ENTRUST" title = "Entrust"
slug = "entrust-issuer" slug = "entrust-issuer"
description = "Enables the creation of certificates by ENTRUST" description = "Enables the creation of certificates by ENTRUST"
version = ENTRUST.VERSION version = entrust.VERSION
author = "sirferl" author = "sirferl"
author_url = "https://github.com/sirferl/lemur" author_url = "https://github.com/sirferl/lemur"
@ -119,7 +134,6 @@ class EntrustIssuerPlugin(IssuerPlugin):
"ENTRUST_NAME", "ENTRUST_NAME",
"ENTRUST_EMAIL", "ENTRUST_EMAIL",
"ENTRUST_PHONE", "ENTRUST_PHONE",
"ENTRUST_ISSUING",
] ]
validate_conf(current_app, required_vars) validate_conf(current_app, required_vars)
@ -142,9 +156,12 @@ class EntrustIssuerPlugin(IssuerPlugin):
:param issuer_options: :param issuer_options:
:return: :raise Exception: :return: :raise Exception:
""" """
current_app.logger.info( log_data = {
"Requesting options: {0}".format(issuer_options) "function": f"{__name__}.{sys._getframe().f_code.co_name}",
) "message": "Requesting options",
"options": issuer_options
}
current_app.logger.info(log_data)
url = current_app.config.get("ENTRUST_URL") + "/certificates" url = current_app.config.get("ENTRUST_URL") + "/certificates"
@ -156,36 +173,46 @@ class EntrustIssuerPlugin(IssuerPlugin):
except requests.exceptions.Timeout: except requests.exceptions.Timeout:
raise Exception("Timeout for POST") raise Exception("Timeout for POST")
except requests.exceptions.RequestException as e: except requests.exceptions.RequestException as e:
raise Exception("Error for POST {0}".format(e)) raise Exception(f"Error for POST {e}")
response_dict = handle_response(response) response_dict = handle_response(response)
external_id = response_dict['trackingId'] external_id = response_dict['trackingId']
cert = response_dict['endEntityCert'] cert = response_dict['endEntityCert']
chain = response_dict['chainCerts'][1] if len(response_dict['chainCerts']) < 2:
current_app.logger.info( # certificate signed by CA directly, no ICA included ini the chain
"Received Chain: {0}".format(chain) chain = None
) else:
chain = response_dict['chainCerts'][1]
log_data["message"] = "Received Chain"
log_data["options"] = f"chain: {chain}"
current_app.logger.info(log_data)
return cert, chain, external_id return cert, chain, external_id
def revoke_certificate(self, certificate, comments): def revoke_certificate(self, certificate, comments):
"""Revoke a Digicert certificate.""" """Revoke an Entrust certificate."""
base_url = current_app.config.get("ENTRUST_URL") base_url = current_app.config.get("ENTRUST_URL")
# make certificate revoke request # make certificate revoke request
revoke_url = "{0}/certificates/{1}/revocations".format( revoke_url = f"{base_url}/certificates/{certificate.external_id}/revocations"
base_url, certificate.external_id if not comments or comments == '':
)
metrics.send("entrust_revoke_certificate", "counter", 1)
if comments == '' or not comments:
comments = "revoked via API" comments = "revoked via API"
data = { data = {
"crlReason": "superseded", "crlReason": "superseded", # enum (keyCompromise, affiliationChanged, superseded, cessationOfOperation)
"revocationComment": comments "revocationComment": comments
} }
response = self.session.post(revoke_url, json=data) response = self.session.post(revoke_url, json=data)
metrics.send("entrust_revoke_certificate", "counter", 1)
return handle_response(response)
data = handle_response(response) def deactivate_certificate(self, certificate):
"""Deactivates an Entrust certificate."""
base_url = current_app.config.get("ENTRUST_URL")
deactivate_url = f"{base_url}/certificates/{certificate.external_id}/deactivations"
response = self.session.post(deactivate_url)
metrics.send("entrust_deactivate_certificate", "counter", 1)
return handle_response(response)
@staticmethod @staticmethod
def create_authority(options): def create_authority(options):
@ -200,7 +227,8 @@ class EntrustIssuerPlugin(IssuerPlugin):
entrust_root = current_app.config.get("ENTRUST_ROOT") entrust_root = current_app.config.get("ENTRUST_ROOT")
entrust_issuing = current_app.config.get("ENTRUST_ISSUING") entrust_issuing = current_app.config.get("ENTRUST_ISSUING")
role = {"username": "", "password": "", "name": "entrust"} role = {"username": "", "password": "", "name": "entrust"}
current_app.logger.info("Creating Auth: {0} {1}".format(options, entrust_issuing)) current_app.logger.info(f"Creating Auth: {options} {entrust_issuing}")
# body, chain, role
return entrust_root, "", [role] return entrust_root, "", [role]
def get_ordered_certificate(self, order_id): def get_ordered_certificate(self, order_id):
@ -211,10 +239,10 @@ class EntrustIssuerPlugin(IssuerPlugin):
class EntrustSourcePlugin(SourcePlugin): class EntrustSourcePlugin(SourcePlugin):
title = "ENTRUST" title = "Entrust"
slug = "entrust-source" slug = "entrust-source"
description = "Enables the collecion of certificates" description = "Enables the collection of certificates"
version = ENTRUST.VERSION version = entrust.VERSION
author = "sirferl" author = "sirferl"
author_url = "https://github.com/sirferl/lemur" author_url = "https://github.com/sirferl/lemur"

View File

@ -0,0 +1 @@
from lemur.tests.conftest import * # noqa

View File

@ -0,0 +1,54 @@
from unittest.mock import patch, Mock
import arrow
from cryptography import x509
from lemur.plugins.lemur_entrust import plugin
def config_mock(*args):
values = {
"ENTRUST_API_CERT": "-----BEGIN CERTIFICATE-----abc-----END CERTIFICATE-----",
"ENTRUST_API_KEY": False,
"ENTRUST_API_USER": "test",
"ENTRUST_API_PASS": "password",
"ENTRUST_URL": "http",
"ENTRUST_ROOT": None,
"ENTRUST_NAME": "test",
"ENTRUST_EMAIL": "test@lemur.net",
"ENTRUST_PHONE": "0123456",
"ENTRUST_PRODUCT_ENTRUST": "ADVANTAGE_SSL"
}
return values[args[0]]
@patch("lemur.plugins.lemur_entrust.plugin.current_app")
def test_process_options(mock_current_app, authority):
mock_current_app.config.get = Mock(side_effect=config_mock)
plugin.determine_end_date = Mock(return_value=arrow.get(2020, 10, 7).format('YYYY-MM-DD'))
authority.name = "Entrust"
names = [u"one.example.com", u"two.example.com", u"three.example.com"]
options = {
"common_name": "example.com",
"owner": "bob@example.com",
"description": "test certificate",
"extensions": {"sub_alt_names": {"names": [x509.DNSName(x) for x in names]}},
"organization": "Example, Inc.",
"organizational_unit": "Example Org",
"validity_end": arrow.get(2020, 10, 7),
"authority": authority,
}
expected = {
"signingAlg": "SHA-2",
"eku": "SERVER_AND_CLIENT_AUTH",
"certType": "ADVANTAGE_SSL",
"certExpiryDate": arrow.get(2020, 10, 7).format('YYYY-MM-DD'),
"tracking": {
"requesterName": mock_current_app.config.get("ENTRUST_NAME"),
"requesterEmail": mock_current_app.config.get("ENTRUST_EMAIL"),
"requesterPhone": mock_current_app.config.get("ENTRUST_PHONE")
}
}
assert expected == plugin.process_options(options)

View File

@ -1,9 +1,18 @@
# This is just Python which means you can inherit and tweak settings # This is just Python which means you can inherit and tweak settings
import os import os
import random
import string
_basedir = os.path.abspath(os.path.dirname(__file__)) _basedir = os.path.abspath(os.path.dirname(__file__))
# generate random secrets for unittest
def get_random_secret(length):
input_ascii = string.ascii_letters + string.digits
return ''.join(random.choice(input_ascii) for i in range(length))
THREADS_PER_PAGE = 8 THREADS_PER_PAGE = 8
# General # General
@ -86,7 +95,6 @@ DIGICERT_CIS_API_KEY = "api-key"
DIGICERT_CIS_ROOTS = {"root": "ROOT"} DIGICERT_CIS_ROOTS = {"root": "ROOT"}
DIGICERT_CIS_INTERMEDIATES = {"inter": "INTERMEDIATE_CA_CERT"} DIGICERT_CIS_INTERMEDIATES = {"inter": "INTERMEDIATE_CA_CERT"}
VERISIGN_URL = "http://example.com" VERISIGN_URL = "http://example.com"
VERISIGN_PEM_PATH = "~/" VERISIGN_PEM_PATH = "~/"
VERISIGN_FIRST_NAME = "Jim" VERISIGN_FIRST_NAME = "Jim"
@ -197,3 +205,41 @@ LDAP_REQUIRED_GROUP = "Lemur Access"
LDAP_DEFAULT_ROLE = "role1" LDAP_DEFAULT_ROLE = "role1"
ALLOW_CERT_DELETION = True ALLOW_CERT_DELETION = True
ENTRUST_API_CERT = "api-cert"
ENTRUST_API_KEY = get_random_secret(32)
ENTRUST_API_USER = "user"
ENTRUST_API_PASS = get_random_secret(32)
ENTRUST_URL = "https://api.entrust.net/enterprise/v2"
ENTRUST_ROOT = """
-----BEGIN CERTIFICATE-----
MIIEPjCCAyagAwIBAgIESlOMKDANBgkqhkiG9w0BAQsFADCBvjELMAkGA1UEBhMC
VVMxFjAUBgNVBAoTDUVudHJ1c3QsIEluYy4xKDAmBgNVBAsTH1NlZSB3d3cuZW50
cnVzdC5uZXQvbGVnYWwtdGVybXMxOTA3BgNVBAsTMChjKSAyMDA5IEVudHJ1c3Qs
IEluYy4gLSBmb3IgYXV0aG9yaXplZCB1c2Ugb25seTEyMDAGA1UEAxMpRW50cnVz
dCBSb290IENlcnRpZmljYXRpb24gQXV0aG9yaXR5IC0gRzIwHhcNMDkwNzA3MTcy
NTU0WhcNMzAxMjA3MTc1NTU0WjCBvjELMAkGA1UEBhMCVVMxFjAUBgNVBAoTDUVu
dHJ1c3QsIEluYy4xKDAmBgNVBAsTH1NlZSB3d3cuZW50cnVzdC5uZXQvbGVnYWwt
dGVybXMxOTA3BgNVBAsTMChjKSAyMDA5IEVudHJ1c3QsIEluYy4gLSBmb3IgYXV0
aG9yaXplZCB1c2Ugb25seTEyMDAGA1UEAxMpRW50cnVzdCBSb290IENlcnRpZmlj
YXRpb24gQXV0aG9yaXR5IC0gRzIwggEiMA0GCSqGSIb3DQEBAQUAA4IBDwAwggEK
AoIBAQC6hLZy254Ma+KZ6TABp3bqMriVQRrJ2mFOWHLP/vaCeb9zYQYKpSfYs1/T
RU4cctZOMvJyig/3gxnQaoCAAEUesMfnmr8SVycco2gvCoe9amsOXmXzHHfV1IWN
cCG0szLni6LVhjkCsbjSR87kyUnEO6fe+1R9V77w6G7CebI6C1XiUJgWMhNcL3hW
wcKUs/Ja5CeanyTXxuzQmyWC48zCxEXFjJd6BmsqEZ+pCm5IO2/b1BEZQvePB7/1
U1+cPvQXLOZprE4yTGJ36rfo5bs0vBmLrpxR57d+tVOxMyLlbc9wPBr64ptntoP0
jaWvYkxN4FisZDQSA/i2jZRjJKRxAgMBAAGjQjBAMA4GA1UdDwEB/wQEAwIBBjAP
BgNVHRMBAf8EBTADAQH/MB0GA1UdDgQWBBRqciZ60B7vfec7aVHUbI2fkBJmqzAN
BgkqhkiG9w0BAQsFAAOCAQEAeZ8dlsa2eT8ijYfThwMEYGprmi5ZiXMRrEPR9RP/
jTkrwPK9T3CMqS/qF8QLVJ7UG5aYMzyorWKiAHarWWluBh1+xLlEjZivEtRh2woZ
Rkfz6/djwUAFQKXSt/S1mja/qYh2iARVBCuch38aNzx+LaUa2NSJXsq9rD1s2G2v
1fN2D807iDginWyTmsQ9v4IbZT+mD12q/OWyFcq1rca8PdCE6OoGcrBNOTJ4vz4R
nAuknZoh8/CbCzB428Hch0P+vGOaysXCHMnHjf87ElgI5rY97HosTvuDls4MPGmH
VHOkc8KT/1EQrBVUAdj8BbGJoX90g5pJ19xOe4pIb4tF9g==
-----END CERTIFICATE-----
"""
ENTRUST_NAME = "lemur"
ENTRUST_EMAIL = "lemur@example.com"
ENTRUST_PHONE = "123456"
ENTRUST_ISSUING = ""
ENTRUST_PRODUCT_ENTRUST = "ADVANTAGE_SSL"