""" .. module: lemur.common.fields :platform: Unix :copyright: (c) 2015 by Netflix Inc., see AUTHORS for more :license: Apache, see LICENSE for more details. .. moduleauthor:: Kevin Glisson """ import arrow import warnings import ipaddress from flask import current_app from datetime import datetime as dt from cryptography import x509 from marshmallow import utils from marshmallow.fields import Field from marshmallow.exceptions import ValidationError from lemur.common import validators class Hex(Field): """ A hex formatted string. """ def _serialize(self, value, attr, obj): if value: value = hex(int(value))[2:].upper() return value class ArrowDateTime(Field): """A formatted datetime string in UTC. Example: ``'2014-12-22T03:12:58.019077+00:00'`` Timezone-naive `datetime` objects are converted to UTC (+00:00) by :meth:`Schema.dump `. :meth:`Schema.load ` returns `datetime` objects that are timezone-aware. :param str format: Either ``"rfc"`` (for RFC822), ``"iso"`` (for ISO8601), or a date format string. If `None`, defaults to "iso". :param kwargs: The same keyword arguments that :class:`Field` receives. """ DATEFORMAT_SERIALIZATION_FUNCS = { 'iso': utils.isoformat, 'iso8601': utils.isoformat, 'rfc': utils.rfcformat, 'rfc822': utils.rfcformat, } DATEFORMAT_DESERIALIZATION_FUNCS = { 'iso': utils.from_iso, 'iso8601': utils.from_iso, 'rfc': utils.from_rfc, 'rfc822': utils.from_rfc, } DEFAULT_FORMAT = 'iso' localtime = False default_error_messages = { 'invalid': 'Not a valid datetime.', 'format': '"{input}" cannot be formatted as a datetime.', } def __init__(self, format=None, **kwargs): super(ArrowDateTime, self).__init__(**kwargs) # Allow this to be None. It may be set later in the ``_serialize`` # or ``_desrialize`` methods This allows a Schema to dynamically set the # dateformat, e.g. from a Meta option self.dateformat = format def _add_to_schema(self, field_name, schema): super(ArrowDateTime, self)._add_to_schema(field_name, schema) self.dateformat = self.dateformat or schema.opts.dateformat def _serialize(self, value, attr, obj): if value is None: return None self.dateformat = self.dateformat or self.DEFAULT_FORMAT format_func = self.DATEFORMAT_SERIALIZATION_FUNCS.get(self.dateformat, None) if format_func: try: return format_func(value, localtime=self.localtime) except (AttributeError, ValueError) as err: self.fail('format', input=value) else: return value.strftime(self.dateformat) def _deserialize(self, value, attr, data): if not value: # Falsy values, e.g. '', None, [] are not valid raise self.fail('invalid') self.dateformat = self.dateformat or self.DEFAULT_FORMAT func = self.DATEFORMAT_DESERIALIZATION_FUNCS.get(self.dateformat) if func: try: return arrow.get(func(value)) except (TypeError, AttributeError, ValueError): raise self.fail('invalid') elif self.dateformat: try: return dt.datetime.strptime(value, self.dateformat) except (TypeError, AttributeError, ValueError): raise self.fail('invalid') elif utils.dateutil_available: try: return arrow.get(utils.from_datestring(value)) except TypeError: raise self.fail('invalid') else: warnings.warn('It is recommended that you install python-dateutil ' 'for improved datetime deserialization.') raise self.fail('invalid') class KeyUsageExtension(Field): """An x509.KeyUsage ExtensionType object Dict of KeyUsage names/values are deserialized into an x509.KeyUsage object and back. :param kwargs: The same keyword arguments that :class:`Field` receives. """ def _serialize(self, value, attr, obj): return { 'useDigitalSignature': value.digital_signature, 'useNonRepudiation': value.content_commitment, 'useKeyEncipherment': value.key_encipherment, 'useDataEncipherment': value.data_encipherment, 'useKeyAgreement': value.key_agreement, 'useKeyCertSign': value.key_cert_sign, 'useCRLSign': value.crl_sign, 'useEncipherOnly': value._encipher_only, 'useDecipherOnly': value._decipher_only } def _deserialize(self, value, attr, data): keyusages = { 'digital_signature': False, 'content_commitment': False, 'key_encipherment': False, 'data_encipherment': False, 'key_agreement': False, 'key_cert_sign': False, 'crl_sign': False, 'encipher_only': False, 'decipher_only': False } for k, v in value.items(): if k == 'useDigitalSignature': keyusages['digital_signature'] = v elif k == 'useNonRepudiation': keyusages['content_commitment'] = v elif k == 'useKeyEncipherment': keyusages['key_encipherment'] = v elif k == 'useDataEncipherment': keyusages['data_encipherment'] = v elif k == 'useKeyCertSign': keyusages['key_cert_sign'] = v elif k == 'useCRLSign': keyusages['crl_sign'] = v elif k == 'useKeyAgreement': keyusages['key_agreement'] = v elif k == 'useEncipherOnly' and v: keyusages['encipher_only'] = True keyusages['key_agreement'] = True elif k == 'useDecipherOnly' and v: keyusages['decipher_only'] = True keyusages['key_agreement'] = True if keyusages['encipher_only'] and keyusages['decipher_only']: raise ValidationError('A certificate cannot have both Encipher Only and Decipher Only Extended Key Usages.') return x509.KeyUsage( digital_signature=keyusages['digital_signature'], content_commitment=keyusages['content_commitment'], key_encipherment=keyusages['key_encipherment'], data_encipherment=keyusages['data_encipherment'], key_agreement=keyusages['key_agreement'], key_cert_sign=keyusages['key_cert_sign'], crl_sign=keyusages['crl_sign'], encipher_only=keyusages['encipher_only'], decipher_only=keyusages['decipher_only'] ) class ExtendedKeyUsageExtension(Field): """An x509.ExtendedKeyUsage ExtensionType object Dict of ExtendedKeyUsage names/values are deserialized into an x509.ExtendedKeyUsage object and back. :param kwargs: The same keyword arguments that :class:`Field` receives. """ def _serialize(self, value, attr, obj): usages = value._usages usage_list = {} for usage in usages: if usage == x509.oid.ExtendedKeyUsageOID.CLIENT_AUTH: usage_list['useClientAuthentication'] = True elif usage == x509.oid.ExtendedKeyUsageOID.SERVER_AUTH: usage_list['useServerAuthentication'] = True elif usage == x509.oid.ExtendedKeyUsageOID.CODE_SIGNING: usage_list['useCodeSigning'] = True elif usage == x509.oid.ExtendedKeyUsageOID.EMAIL_PROTECTION: usage_list['useEmailProtection'] = True elif usage == x509.oid.ExtendedKeyUsageOID.TIME_STAMPING: usage_list['useTimestamping'] = True elif usage == x509.oid.ExtendedKeyUsageOID.OCSP_SIGNING: usage_list['useOCSPSigning'] = True elif usage.dotted_string == '1.3.6.1.5.5.7.3.14': usage_list['useEapOverLAN'] = True elif usage.dotted_string == '1.3.6.1.5.5.7.3.13': usage_list['useEapOverPPP'] = True elif usage.dotted_string == '1.3.6.1.4.1.311.20.2.2': usage_list['useSmartCardLogon'] = True else: current_app.logger.warning('Unable to serialize ExtendedKeyUsage with OID: {usage}'.format(usage=usage.dotted_string)) return usage_list def _deserialize(self, value, attr, data): usage_oids = [] for k, v in value.items(): if k == 'useClientAuthentication' and v: usage_oids.append(x509.oid.ExtendedKeyUsageOID.CLIENT_AUTH) elif k == 'useServerAuthentication' and v: usage_oids.append(x509.oid.ExtendedKeyUsageOID.SERVER_AUTH) elif k == 'useCodeSigning' and v: usage_oids.append(x509.oid.ExtendedKeyUsageOID.CODE_SIGNING) elif k == 'useEmailProtection' and v: usage_oids.append(x509.oid.ExtendedKeyUsageOID.EMAIL_PROTECTION) elif k == 'useTimestamping' and v: usage_oids.append(x509.oid.ExtendedKeyUsageOID.TIME_STAMPING) elif k == 'useOCSPSigning' and v: usage_oids.append(x509.oid.ExtendedKeyUsageOID.OCSP_SIGNING) elif k == 'useEapOverLAN' and v: usage_oids.append(x509.oid.ObjectIdentifier("1.3.6.1.5.5.7.3.14")) elif k == 'useEapOverPPP' and v: usage_oids.append(x509.oid.ObjectIdentifier("1.3.6.1.5.5.7.3.13")) elif k == 'useSmartCardLogon' and v: usage_oids.append(x509.oid.ObjectIdentifier("1.3.6.1.4.1.311.20.2.2")) else: current_app.logger.warning('Unable to deserialize ExtendedKeyUsage with name: {key}'.format(key=k)) return x509.ExtendedKeyUsage(usage_oids) class BasicConstraintsExtension(Field): """An x509.BasicConstraints ExtensionType object Dict of CA boolean and a path_length integer names/values are deserialized into an x509.BasicConstraints object and back. :param kwargs: The same keyword arguments that :class:`Field` receives. """ def _serialize(self, value, attr, obj): return {'ca': value.ca, 'path_length': value.path_length} def _deserialize(self, value, attr, data): ca = value.get('ca', False) path_length = value.get('path_length', None) if ca: if not isinstance(path_length, (type(None), int)): raise ValidationError('A CA certificate path_length (for BasicConstraints) must be None or an integer.') return x509.BasicConstraints(ca=True, path_length=path_length) else: return x509.BasicConstraints(ca=False, path_length=None) class SubjectAlternativeNameExtension(Field): """An x509.SubjectAlternativeName ExtensionType object Dict of CA boolean and a path_length integer names/values are deserialized into an x509.BasicConstraints object and back. :param kwargs: The same keyword arguments that :class:`Field` receives. """ def _serialize(self, value, attr, obj): general_names = [] name_type = None if value: for name in value._general_names: value = name.value if isinstance(name, x509.DNSName): name_type = 'DNSName' elif isinstance(name, x509.IPAddress): if isinstance(value, ipaddress.IPv4Network): name_type = 'IPNetwork' else: name_type = 'IPAddress' value = str(value) elif isinstance(name, x509.UniformResourceIdentifier): name_type = 'uniformResourceIdentifier' elif isinstance(name, x509.DirectoryName): name_type = 'directoryName' elif isinstance(name, x509.RFC822Name): name_type = 'rfc822Name' elif isinstance(name, x509.RegisteredID): name_type = 'registeredID' value = value.dotted_string else: current_app.logger.warning('Unknown SubAltName type: {name}'.format(name=name)) general_names.append({'nameType': name_type, 'value': value}) return general_names def _deserialize(self, value, attr, data): general_names = [] for name in value: if name['nameType'] == 'DNSName': validators.sensitive_domain(name['value']) general_names.append(x509.DNSName(name['value'])) elif name['nameType'] == 'IPAddress': general_names.append(x509.IPAddress(ipaddress.ip_address(name['value']))) elif name['nameType'] == 'IPNetwork': general_names.append(x509.IPAddress(ipaddress.ip_network(name['value']))) elif name['nameType'] == 'uniformResourceIdentifier': general_names.append(x509.UniformResourceIdentifier(name['value'])) elif name['nameType'] == 'directoryName': # TODO: Need to parse a string in name['value'] like: # 'CN=Common Name, O=Org Name, OU=OrgUnit Name, C=US, ST=ST, L=City/emailAddress=person@example.com' # or # 'CN=Common Name/O=Org Name/OU=OrgUnit Name/C=US/ST=NH/L=City/emailAddress=person@example.com' # and turn it into something like: # x509.Name([ # x509.NameAttribute(x509.OID_COMMON_NAME, "Common Name"), # x509.NameAttribute(x509.OID_ORGANIZATION_NAME, "Org Name"), # x509.NameAttribute(x509.OID_ORGANIZATIONAL_UNIT_NAME, "OrgUnit Name"), # x509.NameAttribute(x509.OID_COUNTRY_NAME, "US"), # x509.NameAttribute(x509.OID_STATE_OR_PROVINCE_NAME, "NH"), # x509.NameAttribute(x509.OID_LOCALITY_NAME, "City"), # x509.NameAttribute(x509.OID_EMAIL_ADDRESS, "person@example.com") # ] # general_names.append(x509.DirectoryName(x509.Name(BLAH)))) pass elif name['nameType'] == 'rfc822Name': general_names.append(x509.RFC822Name(name['value'])) elif name['nameType'] == 'registeredID': general_names.append(x509.RegisteredID(x509.ObjectIdentifier(name['value']))) elif name['nameType'] == 'otherName': # This has two inputs (type and value), so it doesn't fit the mold of the rest of these GeneralName entities. # general_names.append(x509.OtherName(name['type'], bytes(name['value']), 'utf-8')) pass elif name['nameType'] == 'x400Address': # The Python Cryptography library doesn't support x400Address types (yet?) pass elif name['nameType'] == 'EDIPartyName': # The Python Cryptography library doesn't support EDIPartyName types (yet?) pass else: current_app.logger.warning('Unable to deserialize SubAltName with type: {name_type}'.format(name_type=name['nameType'])) return x509.SubjectAlternativeName(general_names)