From 7dbca821c3d0a77d4971b1608bf24f90e30ad96b Mon Sep 17 00:00:00 2001 From: Mike Culbertson Date: Fri, 31 Aug 2018 12:01:49 -0400 Subject: [PATCH 1/7] Reducing the stacked exceptions plus a bit of pep8 --- lemur/certificates/cli.py | 5 ++- lemur/certificates/verify.py | 60 ++++++++++++++++++++++++------------ 2 files changed, 44 insertions(+), 21 deletions(-) diff --git a/lemur/certificates/cli.py b/lemur/certificates/cli.py index f5a84d18..013a4cb1 100644 --- a/lemur/certificates/cli.py +++ b/lemur/certificates/cli.py @@ -363,7 +363,10 @@ def check_revoked(): else: status = verify_string(cert.body, "") - cert.status = 'valid' if status else 'revoked' + if status is None: + cert.status = 'unknown' + else: + cert.status = 'valid' if status else 'revoked' except Exception as e: sentry.captureException() diff --git a/lemur/certificates/verify.py b/lemur/certificates/verify.py index 3d847bdf..0cb4a177 100644 --- a/lemur/certificates/verify.py +++ b/lemur/certificates/verify.py @@ -7,6 +7,7 @@ """ import requests import subprocess +from flask import current_app from requests.exceptions import ConnectionError, InvalidSchema from cryptography import x509 from cryptography.hazmat.backends import default_backend @@ -14,6 +15,7 @@ from cryptography.hazmat.backends import default_backend from lemur.utils import mktempfile from lemur.common.utils import parse_certificate +crl_cache = {} def ocsp_verify(cert_path, issuer_chain_path): """ @@ -29,6 +31,10 @@ def ocsp_verify(cert_path, issuer_chain_path): p1 = subprocess.Popen(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE) url, err = p1.communicate() + if not url: + current_app.logger.debug("No OCSP URL in certificate") + return None + p2 = subprocess.Popen(['openssl', 'ocsp', '-issuer', issuer_chain_path, '-cert', cert_path, "-url", url.strip()], stdout=subprocess.PIPE, stderr=subprocess.PIPE) @@ -57,28 +63,39 @@ def crl_verify(cert_path): :raise Exception: If certificate does not have CRL """ with open(cert_path, 'rt') as c: - cert = parse_certificate(c.read()) + try: + cert = parse_certificate(c.read()) + except Exception as e: + current_app.logger.error(e) + return None - distribution_points = cert.extensions.get_extension_for_oid(x509.OID_CRL_DISTRIBUTION_POINTS).value + try: + distribution_points = cert.extensions.get_extension_for_oid( + x509.OID_CRL_DISTRIBUTION_POINTS + ).value + except x509.ExtensionNotFound: + current_app.logger.warn("No CRLDP extension in certificate") + return None for p in distribution_points: point = p.full_name[0].value - try: - response = requests.get(point) + if point not in crl_cache: + try: + response = requests.get(point) - if response.status_code != 200: + if response.status_code != 200: + raise Exception("Unable to retrieve CRL: {0}".format(point)) + except InvalidSchema: + # Unhandled URI scheme (like ldap://); skip this distribution point. + continue + except ConnectionError: raise Exception("Unable to retrieve CRL: {0}".format(point)) - except InvalidSchema: - # Unhandled URI scheme (like ldap://); skip this distribution point. - continue - except ConnectionError: - raise Exception("Unable to retrieve CRL: {0}".format(point)) - crl = x509.load_der_x509_crl(response.content, backend=default_backend()) + crl_cache[point] = x509.load_der_x509_crl(response.content, backend=default_backend()) - for r in crl: - if cert.serial == r.serial_number: + for r in crl_cache[point]: + if cert.serial_number == r.serial_number: try: reason = r.extensions.get_extension_for_class(x509.CRLReason).value # Handle "removeFromCRL" revoke reason as unrevoked; continue with the next distribution point. @@ -86,6 +103,7 @@ def crl_verify(cert_path): if reason == x509.ReasonFlags.remove_from_crl: break except x509.ExtensionNotFound: + current_app.logger.warn("extension not found in CRL?") pass return @@ -103,13 +121,15 @@ def verify(cert_path, issuer_chain_path): """ # OCSP is our main source of truth, in a lot of cases CRLs # have been deprecated and are no longer updated - try: - return ocsp_verify(cert_path, issuer_chain_path) - except Exception as e: - try: - return crl_verify(cert_path) - except Exception as e: - raise Exception("Failed to verify") + verify_result = ocsp_verify(cert_path, issuer_chain_path) + + if verify_result is None: + verify_result = crl_verify(cert_path) + + if verify_result is None: + current_app.logger.warn("Failed to verify") + + return verify_result def verify_string(cert_string, issuer_string): From 34c88494b852c61b0fe00f381ea08692ed1115a7 Mon Sep 17 00:00:00 2001 From: Mike Culbertson Date: Fri, 31 Aug 2018 12:19:55 -0400 Subject: [PATCH 2/7] More specific exception catch for cert parsing. line shortening. --- lemur/certificates/verify.py | 15 ++++++++++----- 1 file changed, 10 insertions(+), 5 deletions(-) diff --git a/lemur/certificates/verify.py b/lemur/certificates/verify.py index 0cb4a177..1e7d9075 100644 --- a/lemur/certificates/verify.py +++ b/lemur/certificates/verify.py @@ -36,7 +36,9 @@ def ocsp_verify(cert_path, issuer_chain_path): return None p2 = subprocess.Popen(['openssl', 'ocsp', '-issuer', issuer_chain_path, - '-cert', cert_path, "-url", url.strip()], stdout=subprocess.PIPE, stderr=subprocess.PIPE) + '-cert', cert_path, "-url", url.strip()], + stdout=subprocess.PIPE, + stderr=subprocess.PIPE) message, err = p2.communicate() @@ -65,7 +67,7 @@ def crl_verify(cert_path): with open(cert_path, 'rt') as c: try: cert = parse_certificate(c.read()) - except Exception as e: + except ValueError as e: current_app.logger.error(e) return None @@ -92,14 +94,17 @@ def crl_verify(cert_path): except ConnectionError: raise Exception("Unable to retrieve CRL: {0}".format(point)) - crl_cache[point] = x509.load_der_x509_crl(response.content, backend=default_backend()) + crl_cache[point] = x509.load_der_x509_crl(response.content, + backend=default_backend()) for r in crl_cache[point]: if cert.serial_number == r.serial_number: try: reason = r.extensions.get_extension_for_class(x509.CRLReason).value - # Handle "removeFromCRL" revoke reason as unrevoked; continue with the next distribution point. - # Per RFC 5280 section 6.3.3 (k): https://tools.ietf.org/html/rfc5280#section-6.3.3 + # Handle "removeFromCRL" revoke reason as unrevoked; + # continue with the next distribution point. + # Per RFC 5280 section 6.3.3 (k): + # https://tools.ietf.org/html/rfc5280#section-6.3.3 if reason == x509.ReasonFlags.remove_from_crl: break except x509.ExtensionNotFound: From 2815ddf6c8ffb2c3a1fb555024936101c9961551 Mon Sep 17 00:00:00 2001 From: Mike Culbertson Date: Fri, 31 Aug 2018 13:34:55 -0400 Subject: [PATCH 3/7] Moved cert object to be passed to both ocsp/crl methods so we can report in better detail on the certs. Ensured proper returns of False (revoked) True (good) None (unknown) throughout the methods. --- lemur/certificates/verify.py | 39 ++++++++++++++++++++---------------- 1 file changed, 22 insertions(+), 17 deletions(-) diff --git a/lemur/certificates/verify.py b/lemur/certificates/verify.py index 1e7d9075..073ab404 100644 --- a/lemur/certificates/verify.py +++ b/lemur/certificates/verify.py @@ -17,7 +17,7 @@ from lemur.common.utils import parse_certificate crl_cache = {} -def ocsp_verify(cert_path, issuer_chain_path): +def ocsp_verify(cert, cert_path, issuer_chain_path): """ Attempts to verify a certificate via OCSP. OCSP is a more modern version of CRL in that it will query the OCSP URI in order to determine if the @@ -32,7 +32,7 @@ def ocsp_verify(cert_path, issuer_chain_path): url, err = p1.communicate() if not url: - current_app.logger.debug("No OCSP URL in certificate") + current_app.logger.debug("No OCSP URL in certificate {}".format(cert.serial_number)) return None p2 = subprocess.Popen(['openssl', 'ocsp', '-issuer', issuer_chain_path, @@ -48,7 +48,8 @@ def ocsp_verify(cert_path, issuer_chain_path): raise Exception("Got error when parsing OCSP url") elif 'revoked' in p_message: - return + current_app.logger.debug("OCSP reports certificate revoked: {}".format(cert.serial_number)) + return False elif 'good' not in p_message: raise Exception("Did not receive a valid response") @@ -56,7 +57,7 @@ def ocsp_verify(cert_path, issuer_chain_path): return True -def crl_verify(cert_path): +def crl_verify(cert, cert_path): """ Attempts to verify a certificate using CRL. @@ -64,25 +65,19 @@ def crl_verify(cert_path): :return: True if certificate is valid, False otherwise :raise Exception: If certificate does not have CRL """ - with open(cert_path, 'rt') as c: - try: - cert = parse_certificate(c.read()) - except ValueError as e: - current_app.logger.error(e) - return None - try: distribution_points = cert.extensions.get_extension_for_oid( x509.OID_CRL_DISTRIBUTION_POINTS ).value except x509.ExtensionNotFound: - current_app.logger.warn("No CRLDP extension in certificate") + current_app.logger.debug("No CRLDP extension in certificate {}".format(cert.serial_number)) return None for p in distribution_points: point = p.full_name[0].value if point not in crl_cache: + current_app.logger.debug("Retrieving CRL: {}".format(point)) try: response = requests.get(point) @@ -96,6 +91,8 @@ def crl_verify(cert_path): crl_cache[point] = x509.load_der_x509_crl(response.content, backend=default_backend()) + else: + current_app.logger.debug("CRL point is cached {}".format(point)) for r in crl_cache[point]: if cert.serial_number == r.serial_number: @@ -108,10 +105,11 @@ def crl_verify(cert_path): if reason == x509.ReasonFlags.remove_from_crl: break except x509.ExtensionNotFound: - current_app.logger.warn("extension not found in CRL?") pass - return + current_app.logger.debug("CRL reports certificate " + "revoked: {}".format(cert.serial_number)) + return False return True @@ -124,15 +122,22 @@ def verify(cert_path, issuer_chain_path): :param issuer_chain_path: :return: True if valid, False otherwise """ + with open(cert_path, 'rt') as c: + try: + cert = parse_certificate(c.read()) + except ValueError as e: + current_app.logger.error(e) + return None + # OCSP is our main source of truth, in a lot of cases CRLs # have been deprecated and are no longer updated - verify_result = ocsp_verify(cert_path, issuer_chain_path) + verify_result = ocsp_verify(cert, cert_path, issuer_chain_path) if verify_result is None: - verify_result = crl_verify(cert_path) + verify_result = crl_verify(cert, cert_path) if verify_result is None: - current_app.logger.warn("Failed to verify") + current_app.logger.debug("Failed to verify {}".format(cert.serial_number)) return verify_result From 652d7f65dd86d0e1c68cf494cf5393741c9526ce Mon Sep 17 00:00:00 2001 From: Mike Culbertson Date: Thu, 27 Sep 2018 09:28:21 -0400 Subject: [PATCH 4/7] flake8 tweak --- lemur/certificates/verify.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/lemur/certificates/verify.py b/lemur/certificates/verify.py index 073ab404..ad047639 100644 --- a/lemur/certificates/verify.py +++ b/lemur/certificates/verify.py @@ -17,6 +17,7 @@ from lemur.common.utils import parse_certificate crl_cache = {} + def ocsp_verify(cert, cert_path, issuer_chain_path): """ Attempts to verify a certificate via OCSP. OCSP is a more modern version @@ -67,8 +68,8 @@ def crl_verify(cert, cert_path): """ try: distribution_points = cert.extensions.get_extension_for_oid( - x509.OID_CRL_DISTRIBUTION_POINTS - ).value + x509.OID_CRL_DISTRIBUTION_POINTS + ).value except x509.ExtensionNotFound: current_app.logger.debug("No CRLDP extension in certificate {}".format(cert.serial_number)) return None From f19b6382bc38a5ce91de3746ee3af31e14117684 Mon Sep 17 00:00:00 2001 From: Mike Culbertson Date: Thu, 27 Sep 2018 10:10:04 -0400 Subject: [PATCH 5/7] Updated verify tests --- lemur/tests/test_verify.py | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/lemur/tests/test_verify.py b/lemur/tests/test_verify.py index 4a7854b2..0572d0d3 100644 --- a/lemur/tests/test_verify.py +++ b/lemur/tests/test_verify.py @@ -13,8 +13,7 @@ from .vectors import INTERMEDIATE_CERT_STR def test_verify_simple_cert(): """Simple certificate without CRL or OCSP.""" # Verification raises an exception for "unknown" if there are no means to verify it - with pytest.raises(Exception, match="Failed to verify"): - verify_string(INTERMEDIATE_CERT_STR, '') + assert verify_string(INTERMEDIATE_CERT_STR, '') == None def test_verify_crl_unknown_scheme(cert_builder, private_key): @@ -31,7 +30,7 @@ def test_verify_crl_unknown_scheme(cert_builder, private_key): f.write(cert.public_bytes(serialization.Encoding.PEM)) # Must not raise exception - crl_verify(cert_tmp) + crl_verify(cert, cert_tmp) def test_verify_crl_unreachable(cert_builder, private_key): @@ -48,4 +47,4 @@ def test_verify_crl_unreachable(cert_builder, private_key): f.write(cert.public_bytes(serialization.Encoding.PEM)) with pytest.raises(Exception, match="Unable to retrieve CRL:"): - crl_verify(cert_tmp) + crl_verify(cert, cert_tmp) From 590fac4aa88439beb6399df42e2c9538db7bedad Mon Sep 17 00:00:00 2001 From: Mike Culbertson Date: Thu, 27 Sep 2018 10:11:13 -0400 Subject: [PATCH 6/7] docstring update in verify.py --- lemur/certificates/verify.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/lemur/certificates/verify.py b/lemur/certificates/verify.py index ad047639..d42e306c 100644 --- a/lemur/certificates/verify.py +++ b/lemur/certificates/verify.py @@ -24,6 +24,7 @@ def ocsp_verify(cert, cert_path, issuer_chain_path): of CRL in that it will query the OCSP URI in order to determine if the certificate has been revoked + :param cert: :param cert_path: :param issuer_chain_path: :return bool: True if certificate is valid, False otherwise @@ -62,6 +63,7 @@ def crl_verify(cert, cert_path): """ Attempts to verify a certificate using CRL. + :param cert: :param cert_path: :return: True if certificate is valid, False otherwise :raise Exception: If certificate does not have CRL From 40f444409998895831132d47b9f3c0447c737b6a Mon Sep 17 00:00:00 2001 From: Non Sequitur Date: Mon, 1 Oct 2018 22:04:31 -0400 Subject: [PATCH 7/7] Flake8 fix in test_verify.py --- lemur/tests/test_verify.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/lemur/tests/test_verify.py b/lemur/tests/test_verify.py index 0572d0d3..a1f0f5eb 100644 --- a/lemur/tests/test_verify.py +++ b/lemur/tests/test_verify.py @@ -12,8 +12,8 @@ from .vectors import INTERMEDIATE_CERT_STR def test_verify_simple_cert(): """Simple certificate without CRL or OCSP.""" - # Verification raises an exception for "unknown" if there are no means to verify it - assert verify_string(INTERMEDIATE_CERT_STR, '') == None + # Verification returns None if there are no means to verify a cert + assert verify_string(INTERMEDIATE_CERT_STR, '') is None def test_verify_crl_unknown_scheme(cert_builder, private_key):