Reducing the stacked exceptions plus a bit of pep8
This commit is contained in:
parent
b3c4324728
commit
7dbca821c3
@ -363,6 +363,9 @@ def check_revoked():
|
||||
else:
|
||||
status = verify_string(cert.body, "")
|
||||
|
||||
if status is None:
|
||||
cert.status = 'unknown'
|
||||
else:
|
||||
cert.status = 'valid' if status else 'revoked'
|
||||
|
||||
except Exception as e:
|
||||
|
@ -7,6 +7,7 @@
|
||||
"""
|
||||
import requests
|
||||
import subprocess
|
||||
from flask import current_app
|
||||
from requests.exceptions import ConnectionError, InvalidSchema
|
||||
from cryptography import x509
|
||||
from cryptography.hazmat.backends import default_backend
|
||||
@ -14,6 +15,7 @@ from cryptography.hazmat.backends import default_backend
|
||||
from lemur.utils import mktempfile
|
||||
from lemur.common.utils import parse_certificate
|
||||
|
||||
crl_cache = {}
|
||||
|
||||
def ocsp_verify(cert_path, issuer_chain_path):
|
||||
"""
|
||||
@ -29,6 +31,10 @@ def ocsp_verify(cert_path, issuer_chain_path):
|
||||
p1 = subprocess.Popen(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
|
||||
url, err = p1.communicate()
|
||||
|
||||
if not url:
|
||||
current_app.logger.debug("No OCSP URL in certificate")
|
||||
return None
|
||||
|
||||
p2 = subprocess.Popen(['openssl', 'ocsp', '-issuer', issuer_chain_path,
|
||||
'-cert', cert_path, "-url", url.strip()], stdout=subprocess.PIPE, stderr=subprocess.PIPE)
|
||||
|
||||
@ -57,13 +63,24 @@ def crl_verify(cert_path):
|
||||
:raise Exception: If certificate does not have CRL
|
||||
"""
|
||||
with open(cert_path, 'rt') as c:
|
||||
try:
|
||||
cert = parse_certificate(c.read())
|
||||
except Exception as e:
|
||||
current_app.logger.error(e)
|
||||
return None
|
||||
|
||||
distribution_points = cert.extensions.get_extension_for_oid(x509.OID_CRL_DISTRIBUTION_POINTS).value
|
||||
try:
|
||||
distribution_points = cert.extensions.get_extension_for_oid(
|
||||
x509.OID_CRL_DISTRIBUTION_POINTS
|
||||
).value
|
||||
except x509.ExtensionNotFound:
|
||||
current_app.logger.warn("No CRLDP extension in certificate")
|
||||
return None
|
||||
|
||||
for p in distribution_points:
|
||||
point = p.full_name[0].value
|
||||
|
||||
if point not in crl_cache:
|
||||
try:
|
||||
response = requests.get(point)
|
||||
|
||||
@ -75,10 +92,10 @@ def crl_verify(cert_path):
|
||||
except ConnectionError:
|
||||
raise Exception("Unable to retrieve CRL: {0}".format(point))
|
||||
|
||||
crl = x509.load_der_x509_crl(response.content, backend=default_backend())
|
||||
crl_cache[point] = x509.load_der_x509_crl(response.content, backend=default_backend())
|
||||
|
||||
for r in crl:
|
||||
if cert.serial == r.serial_number:
|
||||
for r in crl_cache[point]:
|
||||
if cert.serial_number == r.serial_number:
|
||||
try:
|
||||
reason = r.extensions.get_extension_for_class(x509.CRLReason).value
|
||||
# Handle "removeFromCRL" revoke reason as unrevoked; continue with the next distribution point.
|
||||
@ -86,6 +103,7 @@ def crl_verify(cert_path):
|
||||
if reason == x509.ReasonFlags.remove_from_crl:
|
||||
break
|
||||
except x509.ExtensionNotFound:
|
||||
current_app.logger.warn("extension not found in CRL?")
|
||||
pass
|
||||
|
||||
return
|
||||
@ -103,13 +121,15 @@ def verify(cert_path, issuer_chain_path):
|
||||
"""
|
||||
# OCSP is our main source of truth, in a lot of cases CRLs
|
||||
# have been deprecated and are no longer updated
|
||||
try:
|
||||
return ocsp_verify(cert_path, issuer_chain_path)
|
||||
except Exception as e:
|
||||
try:
|
||||
return crl_verify(cert_path)
|
||||
except Exception as e:
|
||||
raise Exception("Failed to verify")
|
||||
verify_result = ocsp_verify(cert_path, issuer_chain_path)
|
||||
|
||||
if verify_result is None:
|
||||
verify_result = crl_verify(cert_path)
|
||||
|
||||
if verify_result is None:
|
||||
current_app.logger.warn("Failed to verify")
|
||||
|
||||
return verify_result
|
||||
|
||||
|
||||
def verify_string(cert_string, issuer_string):
|
||||
|
Loading…
Reference in New Issue
Block a user