From a59bc1f436f885ac0b6379d6c722956563071d48 Mon Sep 17 00:00:00 2001 From: kevgliss Date: Sat, 28 Jan 2017 16:40:37 -0800 Subject: [PATCH] Fixes (#680) * Adding some additional logging. --- lemur/certificates/models.py | 13 +-- lemur/common/fields.py | 140 +++++++++++++++++++++---------- lemur/tests/test_certificates.py | 8 +- 3 files changed, 105 insertions(+), 56 deletions(-) diff --git a/lemur/certificates/models.py b/lemur/certificates/models.py index 4855d37a..b17dc162 100644 --- a/lemur/certificates/models.py +++ b/lemur/certificates/models.py @@ -230,21 +230,24 @@ class Certificate(db.Model): @property def extensions(self): - return_extensions = {} + # setup default values + return_extensions = { + 'sub_alt_names': {'names': []} + } cert = lemur.common.utils.parse_certificate(self.body) for extension in cert.extensions: value = extension.value if isinstance(value, x509.BasicConstraints): - return_extensions['basic_constraints'] = extension.value + return_extensions['basic_constraints'] = value elif isinstance(value, x509.SubjectAlternativeName): - return_extensions['sub_alt_names'] = {'names': extension.value} + return_extensions['sub_alt_names']['names'] = value elif isinstance(value, x509.ExtendedKeyUsage): - return_extensions['extended_key_usage'] = extension.value + return_extensions['extended_key_usage'] = value elif isinstance(value, x509.KeyUsage): - return_extensions['key_usage'] = extension.value + return_extensions['key_usage'] = value elif isinstance(value, x509.SubjectKeyIdentifier): return_extensions['subject_key_identifier'] = {'include_ski': True} diff --git a/lemur/common/fields.py b/lemur/common/fields.py index e01aafb6..12e9aa08 100644 --- a/lemur/common/fields.py +++ b/lemur/common/fields.py @@ -1,12 +1,23 @@ +""" +.. module: lemur.common.fields + :platform: Unix + :copyright: (c) 2015 by Netflix Inc., see AUTHORS for more + :license: Apache, see LICENSE for more details. +.. moduleauthor:: Kevin Glisson +""" import arrow import warnings -from datetime import datetime as dt -from marshmallow.fields import Field -from marshmallow import utils -from cryptography import x509 -from marshmallow.exceptions import ValidationError import ipaddress +from flask import current_app +from datetime import datetime as dt + +from cryptography import x509 + +from marshmallow import utils +from marshmallow.fields import Field +from marshmallow.exceptions import ValidationError + class ArrowDateTime(Field): """A formatted datetime string in UTC. @@ -191,23 +202,34 @@ class ExtendedKeyUsageExtension(Field): for usage in usages: if usage.dotted_string == x509.oid.ExtendedKeyUsageOID.CLIENT_AUTH: usage_list['useClientAuthentication'] = True - if usage.dotted_string == x509.oid.ExtendedKeyUsageOID.SERVER_AUTH: + + elif usage.dotted_string == x509.oid.ExtendedKeyUsageOID.SERVER_AUTH: usage_list['useServerAuthentication'] = True - if usage.dotted_string == x509.oid.ExtendedKeyUsageOID.CODE_SIGNING: + + elif usage.dotted_string == x509.oid.ExtendedKeyUsageOID.CODE_SIGNING: usage_list['useCodeSigning'] = True - if usage.dotted_string == x509.oid.ExtendedKeyUsageOID.EMAIL_PROTECTION: + + elif usage.dotted_string == x509.oid.ExtendedKeyUsageOID.EMAIL_PROTECTION: usage_list['useEmailProtection'] = True - if usage.dotted_string == x509.oid.ExtendedKeyUsageOID.TIME_STAMPING: + + elif usage.dotted_string == x509.oid.ExtendedKeyUsageOID.TIME_STAMPING: usage_list['useTimestamping'] = True - if usage.dotted_string == x509.oid.ExtendedKeyUsageOID.OCSP_SIGNING: + + elif usage.dotted_string == x509.oid.ExtendedKeyUsageOID.OCSP_SIGNING: usage_list['useOCSPSigning'] = True - if usage.dotted_string == '1.3.6.1.5.5.7.3.14': + + elif usage.dotted_string == '1.3.6.1.5.5.7.3.14': usage_list['useEapOverLAN'] = True - if usage.dotted_string == '1.3.6.1.5.5.7.3.13': + + elif usage.dotted_string == '1.3.6.1.5.5.7.3.13': usage_list['useEapOverPPP'] = True - if usage.dotted_string == '1.3.6.1.4.1.311.20.2.2': + + elif usage.dotted_string == '1.3.6.1.4.1.311.20.2.2': usage_list['useSmartCardLogon'] = True + else: + current_app.logger.warning('Unable to serialize ExtendedKeyUsage with OID: {usage}'.format(usage=usage.dotted_string)) + return usage_list def _deserialize(self, value, attr, data): @@ -216,30 +238,33 @@ class ExtendedKeyUsageExtension(Field): if k == 'useClientAuthentication' and v: usage_oids.append(x509.oid.ExtendedKeyUsageOID.CLIENT_AUTH) - if k == 'useServerAuthentication' and v: + elif k == 'useServerAuthentication' and v: usage_oids.append(x509.oid.ExtendedKeyUsageOID.SERVER_AUTH) - if k == 'useCodeSigning' and v: + elif k == 'useCodeSigning' and v: usage_oids.append(x509.oid.ExtendedKeyUsageOID.CODE_SIGNING) - if k == 'useEmailProtection' and v: + elif k == 'useEmailProtection' and v: usage_oids.append(x509.oid.ExtendedKeyUsageOID.EMAIL_PROTECTION) - if k == 'useTimestamping' and v: + elif k == 'useTimestamping' and v: usage_oids.append(x509.oid.ExtendedKeyUsageOID.TIME_STAMPING) - if k == 'useOCSPSigning' and v: + elif k == 'useOCSPSigning' and v: usage_oids.append(x509.oid.ExtendedKeyUsageOID.OCSP_SIGNING) - if k == 'useEapOverLAN' and v: + elif k == 'useEapOverLAN' and v: usage_oids.append(x509.oid.ObjectIdentifier("1.3.6.1.5.5.7.3.14")) - if k == 'useEapOverPPP' and v: + elif k == 'useEapOverPPP' and v: usage_oids.append(x509.oid.ObjectIdentifier("1.3.6.1.5.5.7.3.13")) - if k == 'useSmartCardLogon' and v: + elif k == 'useSmartCardLogon' and v: usage_oids.append(x509.oid.ObjectIdentifier("1.3.6.1.4.1.311.20.2.2")) + else: + current_app.logger.warning('Unable to deserialize ExtendedKeyUsage with name: {key}'.format(key=k)) + return x509.ExtendedKeyUsage(usage_oids) @@ -280,22 +305,33 @@ class SubjectAlternativeNameExtension(Field): def _serialize(self, value, attr, obj): general_names = [] name_type = None - for name in value._general_names: - value = name.value - if isinstance(name, x509.DNSName): - name_type = 'DNSName' - if isinstance(name, x509.IPAddress): - name_type = 'IPAddress' - if isinstance(name, x509.UniformResourceIdentifier): - name_type = 'uniformResourceIdentifier' - if isinstance(name, x509.DirectoryName): - name_type = 'directoryName' - if isinstance(name, x509.RFC822Name): - name_type = 'rfc822Name' - if isinstance(name, x509.RegisteredID): - name_type = 'registeredID' - value = value.dotted_string - general_names.append({'nameType': name_type, 'value': value}) + + if value: + for name in value._general_names: + value = name.value + + if isinstance(name, x509.DNSName): + name_type = 'DNSName' + + elif isinstance(name, x509.IPAddress): + name_type = 'IPAddress' + + elif isinstance(name, x509.UniformResourceIdentifier): + name_type = 'uniformResourceIdentifier' + + elif isinstance(name, x509.DirectoryName): + name_type = 'directoryName' + + elif isinstance(name, x509.RFC822Name): + name_type = 'rfc822Name' + + elif isinstance(name, x509.RegisteredID): + name_type = 'registeredID' + value = value.dotted_string + else: + current_app.logger.warning('Unknown SubAltName type: {name}'.format(name=name)) + + general_names.append({'nameType': name_type, 'value': value}) return general_names @@ -304,13 +340,17 @@ class SubjectAlternativeNameExtension(Field): for name in value: if name['nameType'] == 'DNSName': general_names.append(x509.DNSName(name['value'])) - if name['nameType'] == 'IPAddress': + + elif name['nameType'] == 'IPAddress': general_names.append(x509.IPAddress(ipaddress.ip_address(name['value']))) - if name['nameType'] == 'IPNetwork': + + elif name['nameType'] == 'IPNetwork': general_names.append(x509.IPAddress(ipaddress.ip_network(name['value']))) - if name['nameType'] == 'uniformResourceIdentifier': + + elif name['nameType'] == 'uniformResourceIdentifier': general_names.append(x509.UniformResourceIdentifier(name['value'])) - if name['nameType'] == 'directoryName': + + elif name['nameType'] == 'directoryName': # TODO: Need to parse a string in name['value'] like: # 'CN=Common Name, O=Org Name, OU=OrgUnit Name, C=US, ST=ST, L=City/emailAddress=person@example.com' # or @@ -327,19 +367,27 @@ class SubjectAlternativeNameExtension(Field): # ] # general_names.append(x509.DirectoryName(x509.Name(BLAH)))) pass - if name['nameType'] == 'rfc822Name': + + elif name['nameType'] == 'rfc822Name': general_names.append(x509.RFC822Name(name['value'])) - if name['nameType'] == 'registeredID': + + elif name['nameType'] == 'registeredID': general_names.append(x509.RegisteredID(x509.ObjectIdentifier(name['value']))) - if name['nameType'] == 'otherName': + + elif name['nameType'] == 'otherName': # This has two inputs (type and value), so it doesn't fit the mold of the rest of these GeneralName entities. # general_names.append(x509.OtherName(name['type'], bytes(name['value']), 'utf-8')) pass - if name['nameType'] == 'x400Address': + + elif name['nameType'] == 'x400Address': # The Python Cryptography library doesn't support x400Address types (yet?) pass - if name['nameType'] == 'EDIPartyName': + + elif name['nameType'] == 'EDIPartyName': # The Python Cryptography library doesn't support EDIPartyName types (yet?) pass + else: + current_app.logger.warning('Unable to deserialize SubAltName with type: {name_type}'.format(name_type=name['nameType'])) + return x509.SubjectAlternativeName(general_names) diff --git a/lemur/tests/test_certificates.py b/lemur/tests/test_certificates.py index 75b4f2b8..5b20879f 100644 --- a/lemur/tests/test_certificates.py +++ b/lemur/tests/test_certificates.py @@ -123,17 +123,15 @@ def test_extension_schema(client): }, 'subjectKeyIdentifier': { 'includeSKI': True - }, - 'subAltNames': { - 'names': [ - {'nameType': 'DNSName', 'value': 'test.example.com'} - ] } } data, errors = ExtensionSchema().load(input_data) assert not errors + data, errors = ExtensionSchema().dump(data) + assert not errors + def test_certificate_input_schema(client, authority): from lemur.certificates.schemas import CertificateInputSchema