Merge branch 'master' into ADCS-plugin
This commit is contained in:
commit
054685fc38
|
@ -7,18 +7,21 @@ from lemur.extensions import sentry
|
||||||
from lemur.constants import SAN_NAMING_TEMPLATE, DEFAULT_NAMING_TEMPLATE
|
from lemur.constants import SAN_NAMING_TEMPLATE, DEFAULT_NAMING_TEMPLATE
|
||||||
|
|
||||||
|
|
||||||
def text_to_slug(value):
|
def text_to_slug(value, joiner='-'):
|
||||||
"""Normalize a string to a "slug" value, stripping character accents and removing non-alphanum characters."""
|
"""
|
||||||
|
Normalize a string to a "slug" value, stripping character accents and removing non-alphanum characters.
|
||||||
|
A series of non-alphanumeric characters is replaced with the joiner character.
|
||||||
|
"""
|
||||||
|
|
||||||
# Strip all character accents: decompose Unicode characters and then drop combining chars.
|
# Strip all character accents: decompose Unicode characters and then drop combining chars.
|
||||||
value = ''.join(c for c in unicodedata.normalize('NFKD', value) if not unicodedata.combining(c))
|
value = ''.join(c for c in unicodedata.normalize('NFKD', value) if not unicodedata.combining(c))
|
||||||
|
|
||||||
# Replace all remaining non-alphanumeric characters with '-'. Multiple characters get collapsed into a single dash.
|
# Replace all remaining non-alphanumeric characters with joiner string. Multiple characters get collapsed into a
|
||||||
# Except, keep 'xn--' used in IDNA domain names as is.
|
# single joiner. Except, keep 'xn--' used in IDNA domain names as is.
|
||||||
value = re.sub(r'[^A-Za-z0-9.]+(?<!xn--)', '-', value)
|
value = re.sub(r'[^A-Za-z0-9.]+(?<!xn--)', joiner, value)
|
||||||
|
|
||||||
# '-' in the beginning or end of string looks ugly.
|
# '-' in the beginning or end of string looks ugly.
|
||||||
return value.strip('-')
|
return value.strip(joiner)
|
||||||
|
|
||||||
|
|
||||||
def certificate_name(common_name, issuer, not_before, not_after, san):
|
def certificate_name(common_name, issuer, not_before, not_after, san):
|
||||||
|
@ -224,25 +227,20 @@ def bitstrength(cert):
|
||||||
|
|
||||||
def issuer(cert):
|
def issuer(cert):
|
||||||
"""
|
"""
|
||||||
Gets a sane issuer name from a given certificate.
|
Gets a sane issuer slug from a given certificate, stripping non-alphanumeric characters.
|
||||||
|
|
||||||
:param cert:
|
:param cert:
|
||||||
:return: Issuer
|
:return: Issuer slug
|
||||||
"""
|
"""
|
||||||
delchars = ''.join(c for c in map(chr, range(256)) if not c.isalnum())
|
# Try Common Name or fall back to Organization name
|
||||||
try:
|
attrs = (cert.issuer.get_attributes_for_oid(x509.OID_COMMON_NAME) or
|
||||||
# Try organization name or fall back to CN
|
|
||||||
issuer = (cert.issuer.get_attributes_for_oid(x509.OID_COMMON_NAME) or
|
|
||||||
cert.issuer.get_attributes_for_oid(x509.OID_ORGANIZATION_NAME))
|
cert.issuer.get_attributes_for_oid(x509.OID_ORGANIZATION_NAME))
|
||||||
issuer = str(issuer[0].value)
|
if not attrs:
|
||||||
for c in delchars:
|
current_app.logger.error("Unable to get issuer! Cert serial {:x}".format(cert.serial_number))
|
||||||
issuer = issuer.replace(c, "")
|
|
||||||
return issuer
|
|
||||||
except Exception as e:
|
|
||||||
sentry.captureException()
|
|
||||||
current_app.logger.error("Unable to get issuer! {0}".format(e))
|
|
||||||
return "Unknown"
|
return "Unknown"
|
||||||
|
|
||||||
|
return text_to_slug(attrs[0].value, '')
|
||||||
|
|
||||||
|
|
||||||
def not_before(cert):
|
def not_before(cert):
|
||||||
"""
|
"""
|
||||||
|
|
|
@ -21,6 +21,14 @@ COLUMNS = ["notification_id", "certificate_id"]
|
||||||
|
|
||||||
|
|
||||||
def upgrade():
|
def upgrade():
|
||||||
|
connection = op.get_bind()
|
||||||
|
# Delete duplicate entries
|
||||||
|
connection.execute("""\
|
||||||
|
DELETE FROM certificate_notification_associations WHERE ctid NOT IN (
|
||||||
|
-- Select the first tuple ID for each (notification_id, certificate_id) combination and keep that
|
||||||
|
SELECT min(ctid) FROM certificate_notification_associations GROUP BY notification_id, certificate_id
|
||||||
|
)
|
||||||
|
""")
|
||||||
op.create_unique_constraint(CONSTRAINT_NAME, TABLE, COLUMNS)
|
op.create_unique_constraint(CONSTRAINT_NAME, TABLE, COLUMNS)
|
||||||
|
|
||||||
|
|
||||||
|
|
|
@ -11,31 +11,37 @@
|
||||||
.. moduleauthor:: Mikhail Khodorovskiy <mikhail.khodorovskiy@jivesoftware.com>
|
.. moduleauthor:: Mikhail Khodorovskiy <mikhail.khodorovskiy@jivesoftware.com>
|
||||||
"""
|
"""
|
||||||
import base64
|
import base64
|
||||||
import os
|
|
||||||
import urllib
|
|
||||||
import requests
|
|
||||||
import itertools
|
import itertools
|
||||||
|
import os
|
||||||
|
|
||||||
from lemur.certificates.models import Certificate
|
import requests
|
||||||
|
from flask import current_app
|
||||||
|
|
||||||
|
from lemur.common.defaults import common_name
|
||||||
|
from lemur.common.utils import parse_certificate
|
||||||
from lemur.plugins.bases import DestinationPlugin
|
from lemur.plugins.bases import DestinationPlugin
|
||||||
|
|
||||||
DEFAULT_API_VERSION = 'v1'
|
DEFAULT_API_VERSION = 'v1'
|
||||||
|
|
||||||
|
|
||||||
def ensure_resource(k8s_api, k8s_base_uri, namespace, kind, name, data):
|
def ensure_resource(k8s_api, k8s_base_uri, namespace, kind, name, data):
|
||||||
|
|
||||||
# _resolve_uri(k8s_base_uri, namespace, kind, name, api_ver=DEFAULT_API_VERSION)
|
# _resolve_uri(k8s_base_uri, namespace, kind, name, api_ver=DEFAULT_API_VERSION)
|
||||||
url = _resolve_uri(k8s_base_uri, namespace, kind)
|
url = _resolve_uri(k8s_base_uri, namespace, kind)
|
||||||
|
current_app.logger.debug("K8S POST request URL: %s", url)
|
||||||
|
|
||||||
create_resp = k8s_api.post(url, json=data)
|
create_resp = k8s_api.post(url, json=data)
|
||||||
|
current_app.logger.debug("K8S POST response: %s", create_resp)
|
||||||
|
|
||||||
if 200 <= create_resp.status_code <= 299:
|
if 200 <= create_resp.status_code <= 299:
|
||||||
return None
|
return None
|
||||||
|
elif create_resp.json().get('reason', '') != 'AlreadyExists':
|
||||||
elif create_resp.json()['reason'] != 'AlreadyExists':
|
|
||||||
return create_resp.content
|
return create_resp.content
|
||||||
|
|
||||||
update_resp = k8s_api.put(_resolve_uri(k8s_base_uri, namespace, kind, name), json=data)
|
url = _resolve_uri(k8s_base_uri, namespace, kind, name)
|
||||||
|
current_app.logger.debug("K8S PUT request URL: %s", url)
|
||||||
|
|
||||||
|
update_resp = k8s_api.put(url, json=data)
|
||||||
|
current_app.logger.debug("K8S PUT response: %s", update_resp)
|
||||||
|
|
||||||
if not 200 <= update_resp.status_code <= 299:
|
if not 200 <= update_resp.status_code <= 299:
|
||||||
return update_resp.content
|
return update_resp.content
|
||||||
|
@ -43,11 +49,12 @@ def ensure_resource(k8s_api, k8s_base_uri, namespace, kind, name, data):
|
||||||
return
|
return
|
||||||
|
|
||||||
|
|
||||||
def _resolve_ns(k8s_base_uri, namespace, api_ver=DEFAULT_API_VERSION,):
|
def _resolve_ns(k8s_base_uri, namespace, api_ver=DEFAULT_API_VERSION):
|
||||||
api_group = 'api'
|
api_group = 'api'
|
||||||
if '/' in api_ver:
|
if '/' in api_ver:
|
||||||
api_group = 'apis'
|
api_group = 'apis'
|
||||||
return '{base}/{api_group}/{api_ver}/namespaces'.format(base=k8s_base_uri, api_group=api_group, api_ver=api_ver) + ('/' + namespace if namespace else '')
|
return '{base}/{api_group}/{api_ver}/namespaces'.format(base=k8s_base_uri, api_group=api_group, api_ver=api_ver) + (
|
||||||
|
'/' + namespace if namespace else '')
|
||||||
|
|
||||||
|
|
||||||
def _resolve_uri(k8s_base_uri, namespace, kind, name=None, api_ver=DEFAULT_API_VERSION):
|
def _resolve_uri(k8s_base_uri, namespace, kind, name=None, api_ver=DEFAULT_API_VERSION):
|
||||||
|
@ -61,6 +68,41 @@ def _resolve_uri(k8s_base_uri, namespace, kind, name=None, api_ver=DEFAULT_API_V
|
||||||
]))
|
]))
|
||||||
|
|
||||||
|
|
||||||
|
# Performs Base64 encoding of string to string using the base64.b64encode() function
|
||||||
|
# which encodes bytes to bytes.
|
||||||
|
def base64encode(string):
|
||||||
|
return base64.b64encode(string.encode()).decode()
|
||||||
|
|
||||||
|
|
||||||
|
def build_secret(secret_format, secret_name, body, private_key, cert_chain):
|
||||||
|
secret = {
|
||||||
|
'apiVersion': 'v1',
|
||||||
|
'kind': 'Secret',
|
||||||
|
'type': 'Opaque',
|
||||||
|
'metadata': {
|
||||||
|
'name': secret_name,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if secret_format == 'Full':
|
||||||
|
secret['data'] = {
|
||||||
|
'combined.pem': base64encode('%s\n%s' % (body, private_key)),
|
||||||
|
'ca.crt': base64encode(cert_chain),
|
||||||
|
'service.key': base64encode(private_key),
|
||||||
|
'service.crt': base64encode(body),
|
||||||
|
}
|
||||||
|
if secret_format == 'TLS':
|
||||||
|
secret['type'] = 'kubernetes.io/tls'
|
||||||
|
secret['data'] = {
|
||||||
|
'tls.crt': base64encode(cert_chain),
|
||||||
|
'tls.key': base64encode(private_key)
|
||||||
|
}
|
||||||
|
if secret_format == 'Certificate':
|
||||||
|
secret['data'] = {
|
||||||
|
'tls.crt': base64encode(cert_chain),
|
||||||
|
}
|
||||||
|
return secret
|
||||||
|
|
||||||
|
|
||||||
class KubernetesDestinationPlugin(DestinationPlugin):
|
class KubernetesDestinationPlugin(DestinationPlugin):
|
||||||
title = 'Kubernetes'
|
title = 'Kubernetes'
|
||||||
slug = 'kubernetes-destination'
|
slug = 'kubernetes-destination'
|
||||||
|
@ -70,35 +112,81 @@ class KubernetesDestinationPlugin(DestinationPlugin):
|
||||||
author_url = 'https://github.com/mik373/lemur'
|
author_url = 'https://github.com/mik373/lemur'
|
||||||
|
|
||||||
options = [
|
options = [
|
||||||
|
{
|
||||||
|
'name': 'secretNameFormat',
|
||||||
|
'type': 'str',
|
||||||
|
'required': False,
|
||||||
|
# Validation is difficult. This regex is used by kubectl to validate secret names:
|
||||||
|
# [a-z0-9]([-a-z0-9]*[a-z0-9])?(\.[a-z0-9]([-a-z0-9]*[a-z0-9])?)*
|
||||||
|
# Allowing the insertion of "{common_name}" (or any other such placeholder}
|
||||||
|
# at any point in the string proved very challenging and had a tendency to
|
||||||
|
# cause my browser to hang. The specified expression will allow any valid string
|
||||||
|
# but will also accept many invalid strings.
|
||||||
|
'validation': '(?:[a-z0-9.-]|\\{common_name\\})+',
|
||||||
|
'helpMessage': 'Must be a valid secret name, possibly including "{common_name}"',
|
||||||
|
'default': '{common_name}'
|
||||||
|
},
|
||||||
{
|
{
|
||||||
'name': 'kubernetesURL',
|
'name': 'kubernetesURL',
|
||||||
'type': 'str',
|
'type': 'str',
|
||||||
'required': True,
|
'required': False,
|
||||||
'validation': '@(https?|http)://(-\.)?([^\s/?\.#-]+\.?)+(/[^\s]*)?$@iS',
|
'validation': 'https?://[a-zA-Z0-9.-]+(?::[0-9]+)?',
|
||||||
'helpMessage': 'Must be a valid Kubernetes server URL!',
|
'helpMessage': 'Must be a valid Kubernetes server URL!',
|
||||||
|
'default': 'https://kubernetes.default'
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
'name': 'kubernetesAuthToken',
|
'name': 'kubernetesAuthToken',
|
||||||
'type': 'str',
|
'type': 'str',
|
||||||
'required': True,
|
'required': False,
|
||||||
'validation': '/^$|\s+/',
|
'validation': '[0-9a-zA-Z-_.]+',
|
||||||
'helpMessage': 'Must be a valid Kubernetes server Token!',
|
'helpMessage': 'Must be a valid Kubernetes server Token!',
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
'name': 'kubernetesServerCertificate',
|
'name': 'kubernetesAuthTokenFile',
|
||||||
'type': 'str',
|
'type': 'str',
|
||||||
'required': True,
|
'required': False,
|
||||||
'validation': '/^$|\s+/',
|
'validation': '(/[^/]+)+',
|
||||||
|
'helpMessage': 'Must be a valid file path!',
|
||||||
|
'default': '/var/run/secrets/kubernetes.io/serviceaccount/token'
|
||||||
|
},
|
||||||
|
{
|
||||||
|
'name': 'kubernetesServerCertificate',
|
||||||
|
'type': 'textarea',
|
||||||
|
'required': False,
|
||||||
|
'validation': '-----BEGIN CERTIFICATE-----[a-zA-Z0-9/+\\s\\r\\n]+-----END CERTIFICATE-----',
|
||||||
'helpMessage': 'Must be a valid Kubernetes server Certificate!',
|
'helpMessage': 'Must be a valid Kubernetes server Certificate!',
|
||||||
},
|
},
|
||||||
|
{
|
||||||
|
'name': 'kubernetesServerCertificateFile',
|
||||||
|
'type': 'str',
|
||||||
|
'required': False,
|
||||||
|
'validation': '(/[^/]+)+',
|
||||||
|
'helpMessage': 'Must be a valid file path!',
|
||||||
|
'default': '/var/run/secrets/kubernetes.io/serviceaccount/ca.crt'
|
||||||
|
},
|
||||||
{
|
{
|
||||||
'name': 'kubernetesNamespace',
|
'name': 'kubernetesNamespace',
|
||||||
'type': 'str',
|
'type': 'str',
|
||||||
'required': True,
|
'required': False,
|
||||||
'validation': '/^$|\s+/',
|
'validation': '[a-z0-9]([-a-z0-9]*[a-z0-9])?',
|
||||||
'helpMessage': 'Must be a valid Kubernetes Namespace!',
|
'helpMessage': 'Must be a valid Kubernetes Namespace!',
|
||||||
},
|
},
|
||||||
|
{
|
||||||
|
'name': 'kubernetesNamespaceFile',
|
||||||
|
'type': 'str',
|
||||||
|
'required': False,
|
||||||
|
'validation': '(/[^/]+)+',
|
||||||
|
'helpMessage': 'Must be a valid file path!',
|
||||||
|
'default': '/var/run/secrets/kubernetes.io/serviceaccount/namespace'
|
||||||
|
},
|
||||||
|
{
|
||||||
|
'name': 'secretFormat',
|
||||||
|
'type': 'select',
|
||||||
|
'required': True,
|
||||||
|
'available': ['Full', 'TLS', 'Certificate'],
|
||||||
|
'helpMessage': 'The type of Secret to create.',
|
||||||
|
'default': 'Full'
|
||||||
|
}
|
||||||
]
|
]
|
||||||
|
|
||||||
def __init__(self, *args, **kwargs):
|
def __init__(self, *args, **kwargs):
|
||||||
|
@ -106,56 +194,91 @@ class KubernetesDestinationPlugin(DestinationPlugin):
|
||||||
|
|
||||||
def upload(self, name, body, private_key, cert_chain, options, **kwargs):
|
def upload(self, name, body, private_key, cert_chain, options, **kwargs):
|
||||||
|
|
||||||
k8_bearer = self.get_option('kubernetesAuthToken', options)
|
try:
|
||||||
k8_cert = self.get_option('kubernetesServerCertificate', options)
|
|
||||||
k8_namespace = self.get_option('kubernetesNamespace', options)
|
|
||||||
k8_base_uri = self.get_option('kubernetesURL', options)
|
k8_base_uri = self.get_option('kubernetesURL', options)
|
||||||
|
secret_format = self.get_option('secretFormat', options)
|
||||||
|
k8s_api = K8sSession(
|
||||||
|
self.k8s_bearer(options),
|
||||||
|
self.k8s_cert(options)
|
||||||
|
)
|
||||||
|
cn = common_name(parse_certificate(body))
|
||||||
|
secret_name_format = self.get_option('secretNameFormat', options)
|
||||||
|
secret_name = secret_name_format.format(common_name=cn)
|
||||||
|
secret = build_secret(secret_format, secret_name, body, private_key, cert_chain)
|
||||||
|
err = ensure_resource(
|
||||||
|
k8s_api,
|
||||||
|
k8s_base_uri=k8_base_uri,
|
||||||
|
namespace=self.k8s_namespace(options),
|
||||||
|
kind="secret",
|
||||||
|
name=secret_name,
|
||||||
|
data=secret
|
||||||
|
)
|
||||||
|
|
||||||
k8s_api = K8sSession(k8_bearer, k8_cert)
|
except Exception as e:
|
||||||
|
current_app.logger.exception("Exception in upload: {}".format(e), exc_info=True)
|
||||||
cert = Certificate(body=body)
|
raise
|
||||||
|
|
||||||
# in the future once runtime properties can be passed-in - use passed-in secret name
|
|
||||||
secret_name = 'certs-' + urllib.quote_plus(cert.name)
|
|
||||||
|
|
||||||
err = ensure_resource(k8s_api, k8s_base_uri=k8_base_uri, namespace=k8_namespace, kind="secret", name=secret_name, data={
|
|
||||||
'apiVersion': 'v1',
|
|
||||||
'kind': 'Secret',
|
|
||||||
'metadata': {
|
|
||||||
'name': secret_name,
|
|
||||||
},
|
|
||||||
'data': {
|
|
||||||
'combined.pem': base64.b64encode(body + private_key),
|
|
||||||
'ca.crt': base64.b64encode(cert_chain),
|
|
||||||
'service.key': base64.b64encode(private_key),
|
|
||||||
'service.crt': base64.b64encode(body),
|
|
||||||
}
|
|
||||||
})
|
|
||||||
|
|
||||||
if err is not None:
|
if err is not None:
|
||||||
|
current_app.logger.error("Error deploying resource: %s", err)
|
||||||
raise Exception("Error uploading secret: " + err)
|
raise Exception("Error uploading secret: " + err)
|
||||||
|
|
||||||
|
def k8s_bearer(self, options):
|
||||||
|
bearer = self.get_option('kubernetesAuthToken', options)
|
||||||
|
if not bearer:
|
||||||
|
bearer_file = self.get_option('kubernetesAuthTokenFile', options)
|
||||||
|
with open(bearer_file, "r") as file:
|
||||||
|
bearer = file.readline()
|
||||||
|
if bearer:
|
||||||
|
current_app.logger.debug("Using token read from %s", bearer_file)
|
||||||
|
else:
|
||||||
|
raise Exception("Unable to locate token in options or from %s", bearer_file)
|
||||||
|
else:
|
||||||
|
current_app.logger.debug("Using token from options")
|
||||||
|
return bearer
|
||||||
|
|
||||||
|
def k8s_cert(self, options):
|
||||||
|
cert_file = self.get_option('kubernetesServerCertificateFile', options)
|
||||||
|
cert = self.get_option('kubernetesServerCertificate', options)
|
||||||
|
if cert:
|
||||||
|
cert_file = os.path.join(os.path.abspath(os.path.dirname(__file__)), 'k8.cert')
|
||||||
|
with open(cert_file, "w") as text_file:
|
||||||
|
text_file.write(cert)
|
||||||
|
current_app.logger.debug("Using certificate from options")
|
||||||
|
else:
|
||||||
|
current_app.logger.debug("Using certificate from %s", cert_file)
|
||||||
|
return cert_file
|
||||||
|
|
||||||
|
def k8s_namespace(self, options):
|
||||||
|
namespace = self.get_option('kubernetesNamespace', options)
|
||||||
|
if not namespace:
|
||||||
|
namespace_file = self.get_option('kubernetesNamespaceFile', options)
|
||||||
|
with open(namespace_file, "r") as file:
|
||||||
|
namespace = file.readline()
|
||||||
|
if namespace:
|
||||||
|
current_app.logger.debug("Using namespace %s from %s", namespace, namespace_file)
|
||||||
|
else:
|
||||||
|
raise Exception("Unable to locate namespace in options or from %s", namespace_file)
|
||||||
|
else:
|
||||||
|
current_app.logger.debug("Using namespace %s from options", namespace)
|
||||||
|
return namespace
|
||||||
|
|
||||||
|
|
||||||
class K8sSession(requests.Session):
|
class K8sSession(requests.Session):
|
||||||
|
|
||||||
def __init__(self, bearer, cert):
|
def __init__(self, bearer, cert_file):
|
||||||
super(K8sSession, self).__init__()
|
super(K8sSession, self).__init__()
|
||||||
|
|
||||||
self.headers.update({
|
self.headers.update({
|
||||||
'Authorization': 'Bearer %s' % bearer
|
'Authorization': 'Bearer %s' % bearer
|
||||||
})
|
})
|
||||||
|
|
||||||
k8_ca = os.path.join(os.path.abspath(os.path.dirname(__file__)), 'k8.cert')
|
self.verify = cert_file
|
||||||
|
|
||||||
with open(k8_ca, "w") as text_file:
|
def request(self, method, url, params=None, data=None, headers=None, cookies=None, files=None, auth=None,
|
||||||
text_file.write(cert)
|
timeout=30, allow_redirects=True, proxies=None, hooks=None, stream=None, verify=None, cert=None,
|
||||||
|
json=None):
|
||||||
self.verify = k8_ca
|
|
||||||
|
|
||||||
def request(self, method, url, params=None, data=None, headers=None, cookies=None, files=None, auth=None, timeout=30, allow_redirects=True, proxies=None,
|
|
||||||
hooks=None, stream=None, verify=None, cert=None, json=None):
|
|
||||||
"""
|
"""
|
||||||
This method overrides the default timeout to be 10s.
|
This method overrides the default timeout to be 10s.
|
||||||
"""
|
"""
|
||||||
return super(K8sSession, self).request(method, url, params, data, headers, cookies, files, auth, timeout, allow_redirects, proxies, hooks, stream,
|
return super(K8sSession, self).request(method, url, params, data, headers, cookies, files, auth, timeout,
|
||||||
verify, cert, json)
|
allow_redirects, proxies, hooks, stream, verify, cert, json)
|
||||||
|
|
|
@ -11,7 +11,7 @@ from flask_principal import identity_changed, Identity
|
||||||
from lemur import create_app
|
from lemur import create_app
|
||||||
from lemur.database import db as _db
|
from lemur.database import db as _db
|
||||||
from lemur.auth.service import create_token
|
from lemur.auth.service import create_token
|
||||||
from lemur.tests.vectors import SAN_CERT_KEY
|
from lemur.tests.vectors import SAN_CERT_KEY, INTERMEDIATE_KEY
|
||||||
|
|
||||||
from .factories import ApiKeyFactory, AuthorityFactory, NotificationFactory, DestinationFactory, \
|
from .factories import ApiKeyFactory, AuthorityFactory, NotificationFactory, DestinationFactory, \
|
||||||
CertificateFactory, UserFactory, RoleFactory, SourceFactory, EndpointFactory, \
|
CertificateFactory, UserFactory, RoleFactory, SourceFactory, EndpointFactory, \
|
||||||
|
@ -231,6 +231,11 @@ def private_key():
|
||||||
return load_pem_private_key(SAN_CERT_KEY.encode(), password=None, backend=default_backend())
|
return load_pem_private_key(SAN_CERT_KEY.encode(), password=None, backend=default_backend())
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.fixture
|
||||||
|
def issuer_private_key():
|
||||||
|
return load_pem_private_key(INTERMEDIATE_KEY.encode(), password=None, backend=default_backend())
|
||||||
|
|
||||||
|
|
||||||
@pytest.fixture
|
@pytest.fixture
|
||||||
def cert_builder(private_key):
|
def cert_builder(private_key):
|
||||||
return (x509.CertificateBuilder()
|
return (x509.CertificateBuilder()
|
||||||
|
|
|
@ -1,3 +1,7 @@
|
||||||
|
from cryptography import x509
|
||||||
|
from cryptography.hazmat.backends import default_backend
|
||||||
|
from cryptography.hazmat.primitives import hashes
|
||||||
|
|
||||||
from .vectors import SAN_CERT, WILDCARD_CERT, INTERMEDIATE_CERT
|
from .vectors import SAN_CERT, WILDCARD_CERT, INTERMEDIATE_CERT
|
||||||
|
|
||||||
|
|
||||||
|
@ -41,12 +45,14 @@ def test_cert_issuer(client):
|
||||||
def test_text_to_slug(client):
|
def test_text_to_slug(client):
|
||||||
from lemur.common.defaults import text_to_slug
|
from lemur.common.defaults import text_to_slug
|
||||||
assert text_to_slug('test - string') == 'test-string'
|
assert text_to_slug('test - string') == 'test-string'
|
||||||
|
assert text_to_slug('test - string', '') == 'teststring'
|
||||||
# Accented characters are decomposed
|
# Accented characters are decomposed
|
||||||
assert text_to_slug('föö bär') == 'foo-bar'
|
assert text_to_slug('föö bär') == 'foo-bar'
|
||||||
# Melt away the Unicode Snowman
|
# Melt away the Unicode Snowman
|
||||||
assert text_to_slug('\u2603') == ''
|
assert text_to_slug('\u2603') == ''
|
||||||
assert text_to_slug('\u2603test\u2603') == 'test'
|
assert text_to_slug('\u2603test\u2603') == 'test'
|
||||||
assert text_to_slug('snow\u2603man') == 'snow-man'
|
assert text_to_slug('snow\u2603man') == 'snow-man'
|
||||||
|
assert text_to_slug('snow\u2603man', '') == 'snowman'
|
||||||
# IDNA-encoded domain names should be kept as-is
|
# IDNA-encoded domain names should be kept as-is
|
||||||
assert text_to_slug('xn--i1b6eqas.xn--xmpl-loa9b3671b.com') == 'xn--i1b6eqas.xn--xmpl-loa9b3671b.com'
|
assert text_to_slug('xn--i1b6eqas.xn--xmpl-loa9b3671b.com') == 'xn--i1b6eqas.xn--xmpl-loa9b3671b.com'
|
||||||
|
|
||||||
|
@ -75,3 +81,29 @@ def test_create_name(client):
|
||||||
datetime(2015, 5, 12, 0, 0, 0),
|
datetime(2015, 5, 12, 0, 0, 0),
|
||||||
False
|
False
|
||||||
) == 'xn--mnchen-3ya.de-VertrauenswurdigAutoritat-20150507-20150512'
|
) == 'xn--mnchen-3ya.de-VertrauenswurdigAutoritat-20150507-20150512'
|
||||||
|
|
||||||
|
|
||||||
|
def test_issuer(client, cert_builder, issuer_private_key):
|
||||||
|
from lemur.common.defaults import issuer
|
||||||
|
|
||||||
|
assert issuer(INTERMEDIATE_CERT) == 'LemurTrustUnittestsRootCA2018'
|
||||||
|
|
||||||
|
# We need to override builder's issuer name
|
||||||
|
cert_builder._issuer_name = None
|
||||||
|
# Unicode issuer name
|
||||||
|
cert = (cert_builder
|
||||||
|
.issuer_name(x509.Name([x509.NameAttribute(x509.NameOID.COMMON_NAME, 'Vertrauenswürdig Autorität')]))
|
||||||
|
.sign(issuer_private_key, hashes.SHA256(), default_backend()))
|
||||||
|
assert issuer(cert) == 'VertrauenswurdigAutoritat'
|
||||||
|
|
||||||
|
# Fallback to 'Organization' field when issuer CN is missing
|
||||||
|
cert = (cert_builder
|
||||||
|
.issuer_name(x509.Name([x509.NameAttribute(x509.NameOID.ORGANIZATION_NAME, 'No Such Organization')]))
|
||||||
|
.sign(issuer_private_key, hashes.SHA256(), default_backend()))
|
||||||
|
assert issuer(cert) == 'NoSuchOrganization'
|
||||||
|
|
||||||
|
# Missing issuer name
|
||||||
|
cert = (cert_builder
|
||||||
|
.issuer_name(x509.Name([]))
|
||||||
|
.sign(issuer_private_key, hashes.SHA256(), default_backend()))
|
||||||
|
assert issuer(cert) == 'Unknown'
|
||||||
|
|
Loading…
Reference in New Issue