* Adding some additional logging.
This commit is contained in:
kevgliss 2017-01-28 16:40:37 -08:00 committed by GitHub
parent c24810b876
commit a59bc1f436
3 changed files with 105 additions and 56 deletions

View File

@ -230,21 +230,24 @@ class Certificate(db.Model):
@property @property
def extensions(self): def extensions(self):
return_extensions = {} # setup default values
return_extensions = {
'sub_alt_names': {'names': []}
}
cert = lemur.common.utils.parse_certificate(self.body) cert = lemur.common.utils.parse_certificate(self.body)
for extension in cert.extensions: for extension in cert.extensions:
value = extension.value value = extension.value
if isinstance(value, x509.BasicConstraints): if isinstance(value, x509.BasicConstraints):
return_extensions['basic_constraints'] = extension.value return_extensions['basic_constraints'] = value
elif isinstance(value, x509.SubjectAlternativeName): elif isinstance(value, x509.SubjectAlternativeName):
return_extensions['sub_alt_names'] = {'names': extension.value} return_extensions['sub_alt_names']['names'] = value
elif isinstance(value, x509.ExtendedKeyUsage): elif isinstance(value, x509.ExtendedKeyUsage):
return_extensions['extended_key_usage'] = extension.value return_extensions['extended_key_usage'] = value
elif isinstance(value, x509.KeyUsage): elif isinstance(value, x509.KeyUsage):
return_extensions['key_usage'] = extension.value return_extensions['key_usage'] = value
elif isinstance(value, x509.SubjectKeyIdentifier): elif isinstance(value, x509.SubjectKeyIdentifier):
return_extensions['subject_key_identifier'] = {'include_ski': True} return_extensions['subject_key_identifier'] = {'include_ski': True}

View File

@ -1,12 +1,23 @@
"""
.. module: lemur.common.fields
:platform: Unix
:copyright: (c) 2015 by Netflix Inc., see AUTHORS for more
:license: Apache, see LICENSE for more details.
.. moduleauthor:: Kevin Glisson <kglisson@netflix.com>
"""
import arrow import arrow
import warnings import warnings
from datetime import datetime as dt
from marshmallow.fields import Field
from marshmallow import utils
from cryptography import x509
from marshmallow.exceptions import ValidationError
import ipaddress import ipaddress
from flask import current_app
from datetime import datetime as dt
from cryptography import x509
from marshmallow import utils
from marshmallow.fields import Field
from marshmallow.exceptions import ValidationError
class ArrowDateTime(Field): class ArrowDateTime(Field):
"""A formatted datetime string in UTC. """A formatted datetime string in UTC.
@ -191,23 +202,34 @@ class ExtendedKeyUsageExtension(Field):
for usage in usages: for usage in usages:
if usage.dotted_string == x509.oid.ExtendedKeyUsageOID.CLIENT_AUTH: if usage.dotted_string == x509.oid.ExtendedKeyUsageOID.CLIENT_AUTH:
usage_list['useClientAuthentication'] = True usage_list['useClientAuthentication'] = True
if usage.dotted_string == x509.oid.ExtendedKeyUsageOID.SERVER_AUTH:
elif usage.dotted_string == x509.oid.ExtendedKeyUsageOID.SERVER_AUTH:
usage_list['useServerAuthentication'] = True usage_list['useServerAuthentication'] = True
if usage.dotted_string == x509.oid.ExtendedKeyUsageOID.CODE_SIGNING:
elif usage.dotted_string == x509.oid.ExtendedKeyUsageOID.CODE_SIGNING:
usage_list['useCodeSigning'] = True usage_list['useCodeSigning'] = True
if usage.dotted_string == x509.oid.ExtendedKeyUsageOID.EMAIL_PROTECTION:
elif usage.dotted_string == x509.oid.ExtendedKeyUsageOID.EMAIL_PROTECTION:
usage_list['useEmailProtection'] = True usage_list['useEmailProtection'] = True
if usage.dotted_string == x509.oid.ExtendedKeyUsageOID.TIME_STAMPING:
elif usage.dotted_string == x509.oid.ExtendedKeyUsageOID.TIME_STAMPING:
usage_list['useTimestamping'] = True usage_list['useTimestamping'] = True
if usage.dotted_string == x509.oid.ExtendedKeyUsageOID.OCSP_SIGNING:
elif usage.dotted_string == x509.oid.ExtendedKeyUsageOID.OCSP_SIGNING:
usage_list['useOCSPSigning'] = True usage_list['useOCSPSigning'] = True
if usage.dotted_string == '1.3.6.1.5.5.7.3.14':
elif usage.dotted_string == '1.3.6.1.5.5.7.3.14':
usage_list['useEapOverLAN'] = True usage_list['useEapOverLAN'] = True
if usage.dotted_string == '1.3.6.1.5.5.7.3.13':
elif usage.dotted_string == '1.3.6.1.5.5.7.3.13':
usage_list['useEapOverPPP'] = True usage_list['useEapOverPPP'] = True
if usage.dotted_string == '1.3.6.1.4.1.311.20.2.2':
elif usage.dotted_string == '1.3.6.1.4.1.311.20.2.2':
usage_list['useSmartCardLogon'] = True usage_list['useSmartCardLogon'] = True
else:
current_app.logger.warning('Unable to serialize ExtendedKeyUsage with OID: {usage}'.format(usage=usage.dotted_string))
return usage_list return usage_list
def _deserialize(self, value, attr, data): def _deserialize(self, value, attr, data):
@ -216,30 +238,33 @@ class ExtendedKeyUsageExtension(Field):
if k == 'useClientAuthentication' and v: if k == 'useClientAuthentication' and v:
usage_oids.append(x509.oid.ExtendedKeyUsageOID.CLIENT_AUTH) usage_oids.append(x509.oid.ExtendedKeyUsageOID.CLIENT_AUTH)
if k == 'useServerAuthentication' and v: elif k == 'useServerAuthentication' and v:
usage_oids.append(x509.oid.ExtendedKeyUsageOID.SERVER_AUTH) usage_oids.append(x509.oid.ExtendedKeyUsageOID.SERVER_AUTH)
if k == 'useCodeSigning' and v: elif k == 'useCodeSigning' and v:
usage_oids.append(x509.oid.ExtendedKeyUsageOID.CODE_SIGNING) usage_oids.append(x509.oid.ExtendedKeyUsageOID.CODE_SIGNING)
if k == 'useEmailProtection' and v: elif k == 'useEmailProtection' and v:
usage_oids.append(x509.oid.ExtendedKeyUsageOID.EMAIL_PROTECTION) usage_oids.append(x509.oid.ExtendedKeyUsageOID.EMAIL_PROTECTION)
if k == 'useTimestamping' and v: elif k == 'useTimestamping' and v:
usage_oids.append(x509.oid.ExtendedKeyUsageOID.TIME_STAMPING) usage_oids.append(x509.oid.ExtendedKeyUsageOID.TIME_STAMPING)
if k == 'useOCSPSigning' and v: elif k == 'useOCSPSigning' and v:
usage_oids.append(x509.oid.ExtendedKeyUsageOID.OCSP_SIGNING) usage_oids.append(x509.oid.ExtendedKeyUsageOID.OCSP_SIGNING)
if k == 'useEapOverLAN' and v: elif k == 'useEapOverLAN' and v:
usage_oids.append(x509.oid.ObjectIdentifier("1.3.6.1.5.5.7.3.14")) usage_oids.append(x509.oid.ObjectIdentifier("1.3.6.1.5.5.7.3.14"))
if k == 'useEapOverPPP' and v: elif k == 'useEapOverPPP' and v:
usage_oids.append(x509.oid.ObjectIdentifier("1.3.6.1.5.5.7.3.13")) usage_oids.append(x509.oid.ObjectIdentifier("1.3.6.1.5.5.7.3.13"))
if k == 'useSmartCardLogon' and v: elif k == 'useSmartCardLogon' and v:
usage_oids.append(x509.oid.ObjectIdentifier("1.3.6.1.4.1.311.20.2.2")) usage_oids.append(x509.oid.ObjectIdentifier("1.3.6.1.4.1.311.20.2.2"))
else:
current_app.logger.warning('Unable to deserialize ExtendedKeyUsage with name: {key}'.format(key=k))
return x509.ExtendedKeyUsage(usage_oids) return x509.ExtendedKeyUsage(usage_oids)
@ -280,21 +305,32 @@ class SubjectAlternativeNameExtension(Field):
def _serialize(self, value, attr, obj): def _serialize(self, value, attr, obj):
general_names = [] general_names = []
name_type = None name_type = None
if value:
for name in value._general_names: for name in value._general_names:
value = name.value value = name.value
if isinstance(name, x509.DNSName): if isinstance(name, x509.DNSName):
name_type = 'DNSName' name_type = 'DNSName'
if isinstance(name, x509.IPAddress):
elif isinstance(name, x509.IPAddress):
name_type = 'IPAddress' name_type = 'IPAddress'
if isinstance(name, x509.UniformResourceIdentifier):
elif isinstance(name, x509.UniformResourceIdentifier):
name_type = 'uniformResourceIdentifier' name_type = 'uniformResourceIdentifier'
if isinstance(name, x509.DirectoryName):
elif isinstance(name, x509.DirectoryName):
name_type = 'directoryName' name_type = 'directoryName'
if isinstance(name, x509.RFC822Name):
elif isinstance(name, x509.RFC822Name):
name_type = 'rfc822Name' name_type = 'rfc822Name'
if isinstance(name, x509.RegisteredID):
elif isinstance(name, x509.RegisteredID):
name_type = 'registeredID' name_type = 'registeredID'
value = value.dotted_string value = value.dotted_string
else:
current_app.logger.warning('Unknown SubAltName type: {name}'.format(name=name))
general_names.append({'nameType': name_type, 'value': value}) general_names.append({'nameType': name_type, 'value': value})
return general_names return general_names
@ -304,13 +340,17 @@ class SubjectAlternativeNameExtension(Field):
for name in value: for name in value:
if name['nameType'] == 'DNSName': if name['nameType'] == 'DNSName':
general_names.append(x509.DNSName(name['value'])) general_names.append(x509.DNSName(name['value']))
if name['nameType'] == 'IPAddress':
elif name['nameType'] == 'IPAddress':
general_names.append(x509.IPAddress(ipaddress.ip_address(name['value']))) general_names.append(x509.IPAddress(ipaddress.ip_address(name['value'])))
if name['nameType'] == 'IPNetwork':
elif name['nameType'] == 'IPNetwork':
general_names.append(x509.IPAddress(ipaddress.ip_network(name['value']))) general_names.append(x509.IPAddress(ipaddress.ip_network(name['value'])))
if name['nameType'] == 'uniformResourceIdentifier':
elif name['nameType'] == 'uniformResourceIdentifier':
general_names.append(x509.UniformResourceIdentifier(name['value'])) general_names.append(x509.UniformResourceIdentifier(name['value']))
if name['nameType'] == 'directoryName':
elif name['nameType'] == 'directoryName':
# TODO: Need to parse a string in name['value'] like: # TODO: Need to parse a string in name['value'] like:
# 'CN=Common Name, O=Org Name, OU=OrgUnit Name, C=US, ST=ST, L=City/emailAddress=person@example.com' # 'CN=Common Name, O=Org Name, OU=OrgUnit Name, C=US, ST=ST, L=City/emailAddress=person@example.com'
# or # or
@ -327,19 +367,27 @@ class SubjectAlternativeNameExtension(Field):
# ] # ]
# general_names.append(x509.DirectoryName(x509.Name(BLAH)))) # general_names.append(x509.DirectoryName(x509.Name(BLAH))))
pass pass
if name['nameType'] == 'rfc822Name':
elif name['nameType'] == 'rfc822Name':
general_names.append(x509.RFC822Name(name['value'])) general_names.append(x509.RFC822Name(name['value']))
if name['nameType'] == 'registeredID':
elif name['nameType'] == 'registeredID':
general_names.append(x509.RegisteredID(x509.ObjectIdentifier(name['value']))) general_names.append(x509.RegisteredID(x509.ObjectIdentifier(name['value'])))
if name['nameType'] == 'otherName':
elif name['nameType'] == 'otherName':
# This has two inputs (type and value), so it doesn't fit the mold of the rest of these GeneralName entities. # This has two inputs (type and value), so it doesn't fit the mold of the rest of these GeneralName entities.
# general_names.append(x509.OtherName(name['type'], bytes(name['value']), 'utf-8')) # general_names.append(x509.OtherName(name['type'], bytes(name['value']), 'utf-8'))
pass pass
if name['nameType'] == 'x400Address':
elif name['nameType'] == 'x400Address':
# The Python Cryptography library doesn't support x400Address types (yet?) # The Python Cryptography library doesn't support x400Address types (yet?)
pass pass
if name['nameType'] == 'EDIPartyName':
elif name['nameType'] == 'EDIPartyName':
# The Python Cryptography library doesn't support EDIPartyName types (yet?) # The Python Cryptography library doesn't support EDIPartyName types (yet?)
pass pass
else:
current_app.logger.warning('Unable to deserialize SubAltName with type: {name_type}'.format(name_type=name['nameType']))
return x509.SubjectAlternativeName(general_names) return x509.SubjectAlternativeName(general_names)

View File

@ -123,17 +123,15 @@ def test_extension_schema(client):
}, },
'subjectKeyIdentifier': { 'subjectKeyIdentifier': {
'includeSKI': True 'includeSKI': True
},
'subAltNames': {
'names': [
{'nameType': 'DNSName', 'value': 'test.example.com'}
]
} }
} }
data, errors = ExtensionSchema().load(input_data) data, errors = ExtensionSchema().load(input_data)
assert not errors assert not errors
data, errors = ExtensionSchema().dump(data)
assert not errors
def test_certificate_input_schema(client, authority): def test_certificate_input_schema(client, authority):
from lemur.certificates.schemas import CertificateInputSchema from lemur.certificates.schemas import CertificateInputSchema