Use special issuer values <selfsigned> and <unknown> in special cases
This way it's easy to find/distinguish selfsigned certificates stored in Lemur.
This commit is contained in:
parent
1a2712cdf1
commit
51248c1938
|
@ -3,6 +3,8 @@ import unicodedata
|
||||||
|
|
||||||
from cryptography import x509
|
from cryptography import x509
|
||||||
from flask import current_app
|
from flask import current_app
|
||||||
|
|
||||||
|
from lemur.common.utils import is_selfsigned
|
||||||
from lemur.extensions import sentry
|
from lemur.extensions import sentry
|
||||||
from lemur.constants import SAN_NAMING_TEMPLATE, DEFAULT_NAMING_TEMPLATE
|
from lemur.constants import SAN_NAMING_TEMPLATE, DEFAULT_NAMING_TEMPLATE
|
||||||
|
|
||||||
|
@ -229,15 +231,22 @@ def issuer(cert):
|
||||||
"""
|
"""
|
||||||
Gets a sane issuer slug from a given certificate, stripping non-alphanumeric characters.
|
Gets a sane issuer slug from a given certificate, stripping non-alphanumeric characters.
|
||||||
|
|
||||||
:param cert:
|
For self-signed certificates, the special value '<selfsigned>' is returned.
|
||||||
|
If issuer cannot be determined, '<unknown>' is returned.
|
||||||
|
|
||||||
|
:param cert: Parsed certificate object
|
||||||
:return: Issuer slug
|
:return: Issuer slug
|
||||||
"""
|
"""
|
||||||
|
# If certificate is self-signed, we return a special value -- there really is no distinct "issuer" for it
|
||||||
|
if is_selfsigned(cert):
|
||||||
|
return '<selfsigned>'
|
||||||
|
|
||||||
# Try Common Name or fall back to Organization name
|
# Try Common Name or fall back to Organization name
|
||||||
attrs = (cert.issuer.get_attributes_for_oid(x509.OID_COMMON_NAME) or
|
attrs = (cert.issuer.get_attributes_for_oid(x509.OID_COMMON_NAME) or
|
||||||
cert.issuer.get_attributes_for_oid(x509.OID_ORGANIZATION_NAME))
|
cert.issuer.get_attributes_for_oid(x509.OID_ORGANIZATION_NAME))
|
||||||
if not attrs:
|
if not attrs:
|
||||||
current_app.logger.error("Unable to get issuer! Cert serial {:x}".format(cert.serial_number))
|
current_app.logger.error("Unable to get issuer! Cert serial {:x}".format(cert.serial_number))
|
||||||
return "Unknown"
|
return '<unknown>'
|
||||||
|
|
||||||
return text_to_slug(attrs[0].value, '')
|
return text_to_slug(attrs[0].value, '')
|
||||||
|
|
||||||
|
|
|
@ -11,9 +11,10 @@ import string
|
||||||
|
|
||||||
import sqlalchemy
|
import sqlalchemy
|
||||||
from cryptography import x509
|
from cryptography import x509
|
||||||
|
from cryptography.exceptions import InvalidSignature, UnsupportedAlgorithm
|
||||||
from cryptography.hazmat.backends import default_backend
|
from cryptography.hazmat.backends import default_backend
|
||||||
from cryptography.hazmat.primitives import hashes
|
from cryptography.hazmat.primitives import hashes
|
||||||
from cryptography.hazmat.primitives.asymmetric import rsa, ec
|
from cryptography.hazmat.primitives.asymmetric import rsa, ec, padding
|
||||||
from cryptography.hazmat.primitives.serialization import load_pem_private_key
|
from cryptography.hazmat.primitives.serialization import load_pem_private_key
|
||||||
from flask_restful.reqparse import RequestParser
|
from flask_restful.reqparse import RequestParser
|
||||||
from sqlalchemy import and_, func
|
from sqlalchemy import and_, func
|
||||||
|
@ -143,6 +144,40 @@ def generate_private_key(key_type):
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
|
def check_cert_signature(cert, issuer_public_key):
|
||||||
|
"""
|
||||||
|
Check a certificate's signature against an issuer public key.
|
||||||
|
On success, returns None; on failure, raises UnsupportedAlgorithm or InvalidSignature.
|
||||||
|
"""
|
||||||
|
if isinstance(issuer_public_key, rsa.RSAPublicKey):
|
||||||
|
# RSA requires padding, just to make life difficult for us poor developers :(
|
||||||
|
if cert.signature_algorithm_oid == x509.SignatureAlgorithmOID.RSASSA_PSS:
|
||||||
|
# In 2005, IETF devised a more secure padding scheme to replace PKCS #1 v1.5. To make sure that
|
||||||
|
# nobody can easily support or use it, they mandated lots of complicated parameters, unlike any
|
||||||
|
# other X.509 signature scheme.
|
||||||
|
# https://tools.ietf.org/html/rfc4056
|
||||||
|
raise UnsupportedAlgorithm("RSASSA-PSS not supported")
|
||||||
|
else:
|
||||||
|
padder = padding.PKCS1v15()
|
||||||
|
issuer_public_key.verify(cert.signature, cert.tbs_certificate_bytes, padder, cert.signature_hash_algorithm)
|
||||||
|
else:
|
||||||
|
# EllipticCurvePublicKey or DSAPublicKey
|
||||||
|
issuer_public_key.verify(cert.signature, cert.tbs_certificate_bytes, cert.signature_hash_algorithm)
|
||||||
|
|
||||||
|
|
||||||
|
def is_selfsigned(cert):
|
||||||
|
"""
|
||||||
|
Returns True if the certificate is self-signed.
|
||||||
|
Returns False for failed verification or unsupported signing algorithm.
|
||||||
|
"""
|
||||||
|
try:
|
||||||
|
check_cert_signature(cert, cert.public_key())
|
||||||
|
# If verification was successful, it's self-signed.
|
||||||
|
return True
|
||||||
|
except InvalidSignature:
|
||||||
|
return False
|
||||||
|
|
||||||
|
|
||||||
def is_weekend(date):
|
def is_weekend(date):
|
||||||
"""
|
"""
|
||||||
Determines if a given date is on a weekend.
|
Determines if a given date is on a weekend.
|
||||||
|
|
|
@ -3,6 +3,8 @@ import os
|
||||||
import datetime
|
import datetime
|
||||||
import pytest
|
import pytest
|
||||||
from cryptography import x509
|
from cryptography import x509
|
||||||
|
from cryptography.hazmat.backends import default_backend
|
||||||
|
from cryptography.hazmat.primitives import hashes
|
||||||
from flask import current_app
|
from flask import current_app
|
||||||
from flask_principal import identity_changed, Identity
|
from flask_principal import identity_changed, Identity
|
||||||
|
|
||||||
|
@ -263,6 +265,12 @@ def cert_builder(private_key):
|
||||||
.not_valid_after(datetime.datetime(2040, 1, 1)))
|
.not_valid_after(datetime.datetime(2040, 1, 1)))
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.fixture
|
||||||
|
def selfsigned_cert(cert_builder, private_key):
|
||||||
|
# cert_builder uses the same cert public key as 'private_key'
|
||||||
|
return cert_builder.sign(private_key, hashes.SHA256(), default_backend())
|
||||||
|
|
||||||
|
|
||||||
@pytest.fixture(scope='function')
|
@pytest.fixture(scope='function')
|
||||||
def aws_credentials():
|
def aws_credentials():
|
||||||
os.environ['AWS_ACCESS_KEY_ID'] = 'testing'
|
os.environ['AWS_ACCESS_KEY_ID'] = 'testing'
|
||||||
|
|
|
@ -81,6 +81,13 @@ def test_create_name(client):
|
||||||
datetime(2015, 5, 12, 0, 0, 0),
|
datetime(2015, 5, 12, 0, 0, 0),
|
||||||
False
|
False
|
||||||
) == 'xn--mnchen-3ya.de-VertrauenswurdigAutoritat-20150507-20150512'
|
) == 'xn--mnchen-3ya.de-VertrauenswurdigAutoritat-20150507-20150512'
|
||||||
|
assert certificate_name(
|
||||||
|
'selfie.example.org',
|
||||||
|
'<selfsigned>',
|
||||||
|
datetime(2015, 5, 7, 0, 0, 0),
|
||||||
|
datetime(2025, 5, 12, 13, 37, 0),
|
||||||
|
False
|
||||||
|
) == 'selfie.example.org-selfsigned-20150507-20250512'
|
||||||
|
|
||||||
|
|
||||||
def test_issuer(client, cert_builder, issuer_private_key):
|
def test_issuer(client, cert_builder, issuer_private_key):
|
||||||
|
@ -106,4 +113,9 @@ def test_issuer(client, cert_builder, issuer_private_key):
|
||||||
cert = (cert_builder
|
cert = (cert_builder
|
||||||
.issuer_name(x509.Name([]))
|
.issuer_name(x509.Name([]))
|
||||||
.sign(issuer_private_key, hashes.SHA256(), default_backend()))
|
.sign(issuer_private_key, hashes.SHA256(), default_backend()))
|
||||||
assert issuer(cert) == 'Unknown'
|
assert issuer(cert) == '<unknown>'
|
||||||
|
|
||||||
|
|
||||||
|
def test_issuer_selfsigned(selfsigned_cert):
|
||||||
|
from lemur.common.defaults import issuer
|
||||||
|
assert issuer(selfsigned_cert) == '<selfsigned>'
|
||||||
|
|
|
@ -1,5 +1,7 @@
|
||||||
import pytest
|
import pytest
|
||||||
|
|
||||||
|
from lemur.tests.vectors import SAN_CERT, INTERMEDIATE_CERT, ROOTCA_CERT
|
||||||
|
|
||||||
|
|
||||||
def test_generate_private_key():
|
def test_generate_private_key():
|
||||||
from lemur.common.utils import generate_private_key
|
from lemur.common.utils import generate_private_key
|
||||||
|
@ -71,3 +73,13 @@ KFfxwrO1
|
||||||
-----END CERTIFICATE-----'''
|
-----END CERTIFICATE-----'''
|
||||||
authority_key = get_authority_key(test_cert)
|
authority_key = get_authority_key(test_cert)
|
||||||
assert authority_key == 'feacb541be81771293affa412d8dc9f66a3ebb80'
|
assert authority_key == 'feacb541be81771293affa412d8dc9f66a3ebb80'
|
||||||
|
|
||||||
|
|
||||||
|
def test_is_selfsigned(selfsigned_cert):
|
||||||
|
from lemur.common.utils import is_selfsigned
|
||||||
|
|
||||||
|
assert is_selfsigned(selfsigned_cert) is True
|
||||||
|
assert is_selfsigned(SAN_CERT) is False
|
||||||
|
assert is_selfsigned(INTERMEDIATE_CERT) is False
|
||||||
|
# Root CA certificates are also technically self-signed
|
||||||
|
assert is_selfsigned(ROOTCA_CERT) is True
|
||||||
|
|
|
@ -45,6 +45,7 @@ ssvobJ6Xe2D4cCVjUmsqtFEztMgdqgmlcWyGdUKeXdi7CMoeTb4uO+9qRQq46wYW
|
||||||
n7K1z+W0Kp5yhnnPAoOioAP4vjASDx3z3RnLaZvMmcO7YdCIwhE5oGV0
|
n7K1z+W0Kp5yhnnPAoOioAP4vjASDx3z3RnLaZvMmcO7YdCIwhE5oGV0
|
||||||
-----END CERTIFICATE-----
|
-----END CERTIFICATE-----
|
||||||
"""
|
"""
|
||||||
|
ROOTCA_CERT = parse_certificate(ROOTCA_CERT_STR)
|
||||||
ROOTCA_KEY = """\
|
ROOTCA_KEY = """\
|
||||||
-----BEGIN RSA PRIVATE KEY-----
|
-----BEGIN RSA PRIVATE KEY-----
|
||||||
MIIEowIBAAKCAQEAvyVpe0tfIzri3l3PYH2r7hW86wKF58GLY+Ua52rEO5E3eXQq
|
MIIEowIBAAKCAQEAvyVpe0tfIzri3l3PYH2r7hW86wKF58GLY+Ua52rEO5E3eXQq
|
||||||
|
|
Loading…
Reference in New Issue