cleaning up temporary file creation

This commit is contained in:
kevgliss 2015-08-27 11:53:37 -07:00
parent fe7b075f7b
commit 45158c64a2
1 changed files with 32 additions and 47 deletions

View File

@ -6,14 +6,28 @@
.. moduleauthor:: Kevin Glisson <kglisson@netflix.com> .. moduleauthor:: Kevin Glisson <kglisson@netflix.com>
""" """
import os import os
import re
import hashlib
import requests import requests
import subprocess import subprocess
from OpenSSL import crypto from OpenSSL import crypto
from cryptography import x509
from cryptography.hazmat.backends import default_backend
from flask import current_app from flask import current_app
from contextlib import contextmanager
from tempfile import NamedTemporaryFile
@contextmanager
def mktempfile():
with NamedTemporaryFile(delete=False) as f:
fi = f
try:
yield fi
finally:
os.unlink(fi.name)
def ocsp_verify(cert_path, issuer_chain_path): def ocsp_verify(cert_path, issuer_chain_path):
""" """
@ -53,26 +67,17 @@ def crl_verify(cert_path):
:return: True if certificate is valid, False otherwise :return: True if certificate is valid, False otherwise
:raise Exception: If certificate does not have CRL :raise Exception: If certificate does not have CRL
""" """
s = "(http(s)?\://[a-zA-Z0-9\-\.]+\.[a-zA-Z]{2,3}/\S*?$)" with open(cert_path, 'rt') as c:
regex = re.compile(s, re.MULTILINE) cert = x509.load_pem_x509_certificate(c.read(), default_backend())
x509 = crypto.load_certificate(crypto.FILETYPE_PEM, open(cert_path, 'rt').read()) distribution_points = cert.extensions.get_extension_for_oid(x509.OID_CRL_DISTRIBUTION_POINTS).value
for x in range(x509.get_extension_count()): for p in distribution_points:
ext = x509.get_extension(x) point = p.full_name[0].value
if ext.get_short_name() == 'crlDistributionPoints':
r = regex.search(ext.get_data())
points = r.groups()
break
else:
raise Exception("Certificate does not have a CRL distribution point")
for point in points:
if point:
response = requests.get(point) response = requests.get(point)
crl = crypto.load_crl(crypto.FILETYPE_ASN1, response.content) crl = crypto.load_crl(crypto.FILETYPE_ASN1, response.content) # TODO this should be switched to cryptography when support exists
revoked = crl.get_revoked() revoked = crl.get_revoked()
for r in revoked: for r in revoked:
if x509.get_serial_number() == r.get_serial(): if cert.serial == r.get_serial():
return return
return True return True
@ -99,22 +104,6 @@ def verify(cert_path, issuer_chain_path):
raise Exception("Failed to verify") raise Exception("Failed to verify")
def make_tmp_file(string):
"""
Creates a temporary file for a given string
:param string:
:return: Full file path to created file
"""
m = hashlib.md5()
m.update(string)
hexdigest = m.hexdigest()
path = os.path.join(os.path.dirname(os.path.abspath(__file__)), hexdigest)
with open(path, 'w') as f:
f.write(string)
return path
def verify_string(cert_string, issuer_string): def verify_string(cert_string, issuer_string):
""" """
Verify a certificate given only it's string value Verify a certificate given only it's string value
@ -123,13 +112,9 @@ def verify_string(cert_string, issuer_string):
:param issuer_string: :param issuer_string:
:return: True if valid, False otherwise :return: True if valid, False otherwise
""" """
cert_path = make_tmp_file(cert_string) with mktempfile() as cert_tmp:
issuer_path = make_tmp_file(issuer_string) cert_tmp.write(cert_string)
status = verify(cert_path, issuer_path) with mktempfile() as issuer_tmp:
remove_tmp_file(cert_path) issuer_tmp.write(issuer_string)
remove_tmp_file(issuer_path) status = verify(cert_tmp.path, issuer_tmp.path)
return status return status
def remove_tmp_file(file_path):
os.remove(file_path)