diff --git a/lemur/certificates/cli.py b/lemur/certificates/cli.py index f5a84d18..013a4cb1 100644 --- a/lemur/certificates/cli.py +++ b/lemur/certificates/cli.py @@ -363,7 +363,10 @@ def check_revoked(): else: status = verify_string(cert.body, "") - cert.status = 'valid' if status else 'revoked' + if status is None: + cert.status = 'unknown' + else: + cert.status = 'valid' if status else 'revoked' except Exception as e: sentry.captureException() diff --git a/lemur/certificates/verify.py b/lemur/certificates/verify.py index 3d847bdf..d42e306c 100644 --- a/lemur/certificates/verify.py +++ b/lemur/certificates/verify.py @@ -7,6 +7,7 @@ """ import requests import subprocess +from flask import current_app from requests.exceptions import ConnectionError, InvalidSchema from cryptography import x509 from cryptography.hazmat.backends import default_backend @@ -14,13 +15,16 @@ from cryptography.hazmat.backends import default_backend from lemur.utils import mktempfile from lemur.common.utils import parse_certificate +crl_cache = {} -def ocsp_verify(cert_path, issuer_chain_path): + +def ocsp_verify(cert, cert_path, issuer_chain_path): """ Attempts to verify a certificate via OCSP. OCSP is a more modern version of CRL in that it will query the OCSP URI in order to determine if the certificate has been revoked + :param cert: :param cert_path: :param issuer_chain_path: :return bool: True if certificate is valid, False otherwise @@ -29,8 +33,14 @@ def ocsp_verify(cert_path, issuer_chain_path): p1 = subprocess.Popen(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE) url, err = p1.communicate() + if not url: + current_app.logger.debug("No OCSP URL in certificate {}".format(cert.serial_number)) + return None + p2 = subprocess.Popen(['openssl', 'ocsp', '-issuer', issuer_chain_path, - '-cert', cert_path, "-url", url.strip()], stdout=subprocess.PIPE, stderr=subprocess.PIPE) + '-cert', cert_path, "-url", url.strip()], + stdout=subprocess.PIPE, + stderr=subprocess.PIPE) message, err = p2.communicate() @@ -40,7 +50,8 @@ def ocsp_verify(cert_path, issuer_chain_path): raise Exception("Got error when parsing OCSP url") elif 'revoked' in p_message: - return + current_app.logger.debug("OCSP reports certificate revoked: {}".format(cert.serial_number)) + return False elif 'good' not in p_message: raise Exception("Did not receive a valid response") @@ -48,47 +59,60 @@ def ocsp_verify(cert_path, issuer_chain_path): return True -def crl_verify(cert_path): +def crl_verify(cert, cert_path): """ Attempts to verify a certificate using CRL. + :param cert: :param cert_path: :return: True if certificate is valid, False otherwise :raise Exception: If certificate does not have CRL """ - with open(cert_path, 'rt') as c: - cert = parse_certificate(c.read()) - - distribution_points = cert.extensions.get_extension_for_oid(x509.OID_CRL_DISTRIBUTION_POINTS).value + try: + distribution_points = cert.extensions.get_extension_for_oid( + x509.OID_CRL_DISTRIBUTION_POINTS + ).value + except x509.ExtensionNotFound: + current_app.logger.debug("No CRLDP extension in certificate {}".format(cert.serial_number)) + return None for p in distribution_points: point = p.full_name[0].value - try: - response = requests.get(point) + if point not in crl_cache: + current_app.logger.debug("Retrieving CRL: {}".format(point)) + try: + response = requests.get(point) - if response.status_code != 200: + if response.status_code != 200: + raise Exception("Unable to retrieve CRL: {0}".format(point)) + except InvalidSchema: + # Unhandled URI scheme (like ldap://); skip this distribution point. + continue + except ConnectionError: raise Exception("Unable to retrieve CRL: {0}".format(point)) - except InvalidSchema: - # Unhandled URI scheme (like ldap://); skip this distribution point. - continue - except ConnectionError: - raise Exception("Unable to retrieve CRL: {0}".format(point)) - crl = x509.load_der_x509_crl(response.content, backend=default_backend()) + crl_cache[point] = x509.load_der_x509_crl(response.content, + backend=default_backend()) + else: + current_app.logger.debug("CRL point is cached {}".format(point)) - for r in crl: - if cert.serial == r.serial_number: + for r in crl_cache[point]: + if cert.serial_number == r.serial_number: try: reason = r.extensions.get_extension_for_class(x509.CRLReason).value - # Handle "removeFromCRL" revoke reason as unrevoked; continue with the next distribution point. - # Per RFC 5280 section 6.3.3 (k): https://tools.ietf.org/html/rfc5280#section-6.3.3 + # Handle "removeFromCRL" revoke reason as unrevoked; + # continue with the next distribution point. + # Per RFC 5280 section 6.3.3 (k): + # https://tools.ietf.org/html/rfc5280#section-6.3.3 if reason == x509.ReasonFlags.remove_from_crl: break except x509.ExtensionNotFound: pass - return + current_app.logger.debug("CRL reports certificate " + "revoked: {}".format(cert.serial_number)) + return False return True @@ -101,15 +125,24 @@ def verify(cert_path, issuer_chain_path): :param issuer_chain_path: :return: True if valid, False otherwise """ + with open(cert_path, 'rt') as c: + try: + cert = parse_certificate(c.read()) + except ValueError as e: + current_app.logger.error(e) + return None + # OCSP is our main source of truth, in a lot of cases CRLs # have been deprecated and are no longer updated - try: - return ocsp_verify(cert_path, issuer_chain_path) - except Exception as e: - try: - return crl_verify(cert_path) - except Exception as e: - raise Exception("Failed to verify") + verify_result = ocsp_verify(cert, cert_path, issuer_chain_path) + + if verify_result is None: + verify_result = crl_verify(cert, cert_path) + + if verify_result is None: + current_app.logger.debug("Failed to verify {}".format(cert.serial_number)) + + return verify_result def verify_string(cert_string, issuer_string): diff --git a/lemur/tests/test_verify.py b/lemur/tests/test_verify.py index 4a7854b2..a1f0f5eb 100644 --- a/lemur/tests/test_verify.py +++ b/lemur/tests/test_verify.py @@ -12,9 +12,8 @@ from .vectors import INTERMEDIATE_CERT_STR def test_verify_simple_cert(): """Simple certificate without CRL or OCSP.""" - # Verification raises an exception for "unknown" if there are no means to verify it - with pytest.raises(Exception, match="Failed to verify"): - verify_string(INTERMEDIATE_CERT_STR, '') + # Verification returns None if there are no means to verify a cert + assert verify_string(INTERMEDIATE_CERT_STR, '') is None def test_verify_crl_unknown_scheme(cert_builder, private_key): @@ -31,7 +30,7 @@ def test_verify_crl_unknown_scheme(cert_builder, private_key): f.write(cert.public_bytes(serialization.Encoding.PEM)) # Must not raise exception - crl_verify(cert_tmp) + crl_verify(cert, cert_tmp) def test_verify_crl_unreachable(cert_builder, private_key): @@ -48,4 +47,4 @@ def test_verify_crl_unreachable(cert_builder, private_key): f.write(cert.public_bytes(serialization.Encoding.PEM)) with pytest.raises(Exception, match="Unable to retrieve CRL:"): - crl_verify(cert_tmp) + crl_verify(cert, cert_tmp)