diff --git a/lemur/certificates/verify.py b/lemur/certificates/verify.py index 1e0febec..432c487c 100644 --- a/lemur/certificates/verify.py +++ b/lemur/certificates/verify.py @@ -6,14 +6,28 @@ .. moduleauthor:: Kevin Glisson """ import os -import re -import hashlib import requests import subprocess from OpenSSL import crypto +from cryptography import x509 +from cryptography.hazmat.backends import default_backend from flask import current_app +from contextlib import contextmanager +from tempfile import NamedTemporaryFile + + +@contextmanager +def mktempfile(): + with NamedTemporaryFile(delete=False) as f: + fi = f + + try: + yield fi + finally: + os.unlink(fi.name) + def ocsp_verify(cert_path, issuer_chain_path): """ @@ -53,27 +67,18 @@ def crl_verify(cert_path): :return: True if certificate is valid, False otherwise :raise Exception: If certificate does not have CRL """ - s = "(http(s)?\://[a-zA-Z0-9\-\.]+\.[a-zA-Z]{2,3}/\S*?$)" - regex = re.compile(s, re.MULTILINE) + with open(cert_path, 'rt') as c: + cert = x509.load_pem_x509_certificate(c.read(), default_backend()) - x509 = crypto.load_certificate(crypto.FILETYPE_PEM, open(cert_path, 'rt').read()) - for x in range(x509.get_extension_count()): - ext = x509.get_extension(x) - if ext.get_short_name() == 'crlDistributionPoints': - r = regex.search(ext.get_data()) - points = r.groups() - break - else: - raise Exception("Certificate does not have a CRL distribution point") - - for point in points: - if point: - response = requests.get(point) - crl = crypto.load_crl(crypto.FILETYPE_ASN1, response.content) - revoked = crl.get_revoked() - for r in revoked: - if x509.get_serial_number() == r.get_serial(): - return + distribution_points = cert.extensions.get_extension_for_oid(x509.OID_CRL_DISTRIBUTION_POINTS).value + for p in distribution_points: + point = p.full_name[0].value + response = requests.get(point) + crl = crypto.load_crl(crypto.FILETYPE_ASN1, response.content) # TODO this should be switched to cryptography when support exists + revoked = crl.get_revoked() + for r in revoked: + if cert.serial == r.get_serial(): + return return True @@ -99,22 +104,6 @@ def verify(cert_path, issuer_chain_path): raise Exception("Failed to verify") -def make_tmp_file(string): - """ - Creates a temporary file for a given string - - :param string: - :return: Full file path to created file - """ - m = hashlib.md5() - m.update(string) - hexdigest = m.hexdigest() - path = os.path.join(os.path.dirname(os.path.abspath(__file__)), hexdigest) - with open(path, 'w') as f: - f.write(string) - return path - - def verify_string(cert_string, issuer_string): """ Verify a certificate given only it's string value @@ -123,13 +112,9 @@ def verify_string(cert_string, issuer_string): :param issuer_string: :return: True if valid, False otherwise """ - cert_path = make_tmp_file(cert_string) - issuer_path = make_tmp_file(issuer_string) - status = verify(cert_path, issuer_path) - remove_tmp_file(cert_path) - remove_tmp_file(issuer_path) + with mktempfile() as cert_tmp: + cert_tmp.write(cert_string) + with mktempfile() as issuer_tmp: + issuer_tmp.write(issuer_string) + status = verify(cert_tmp.path, issuer_tmp.path) return status - - -def remove_tmp_file(file_path): - os.remove(file_path)