lemur/lemur/schemas.py

213 lines
6.4 KiB
Python

"""
.. module: lemur.schemas
:platform: unix
:copyright: (c) 2015 by Netflix Inc., see AUTHORS for more
:license: Apache, see LICENSE for more details.
.. moduleauthor:: Kevin Glisson <kglisson@netflix.com>
"""
from marshmallow import fields, post_load, pre_load, post_dump, validates_schema
from lemur.authorities.models import Authority
from lemur.certificates.models import Certificate
from lemur.common import validators
from lemur.common.schema import LemurSchema, LemurInputSchema, LemurOutputSchema
from lemur.destinations.models import Destination
from lemur.notifications.models import Notification
from lemur.plugins import plugins
from lemur.roles.models import Role
from lemur.users.models import User
class AssociatedAuthoritySchema(LemurInputSchema):
id = fields.Int()
name = fields.String()
@post_load
def get_object(self, data, many=False):
if data.get('id'):
return Authority.query.filter(Authority.id == data['id']).one()
elif data.get('name'):
return Authority.query.filter(Authority.name == data['name']).one()
class AssociatedRoleSchema(LemurInputSchema):
id = fields.Int(required=True)
name = fields.String()
@post_load
def get_object(self, data, many=False):
if many:
ids = [d['id'] for d in data]
return Role.query.filter(Role.id.in_(ids)).all()
else:
return Role.query.filter(Role.id == data['id']).one()
class AssociatedDestinationSchema(LemurInputSchema):
id = fields.Int(required=True)
name = fields.String()
@post_load
def get_object(self, data, many=False):
if many:
ids = [d['id'] for d in data]
return Destination.query.filter(Destination.id.in_(ids)).all()
else:
return Destination.query.filter(Destination.id == data['id']).one()
class AssociatedNotificationSchema(LemurInputSchema):
id = fields.Int(required=True)
@post_load
def get_object(self, data, many=False):
if many:
ids = [d['id'] for d in data]
return Notification.query.filter(Notification.id.in_(ids)).all()
else:
return Notification.query.filter(Notification.id == data['id']).one()
class AssociatedCertificateSchema(LemurInputSchema):
id = fields.Int(required=True)
@post_load
def get_object(self, data, many=False):
if many:
ids = [d['id'] for d in data]
return Certificate.query.filter(Certificate.id.in_(ids)).all()
else:
return Certificate.query.filter(Certificate.id == data['id']).one()
class AssociatedUserSchema(LemurInputSchema):
id = fields.Int(required=True)
@post_load
def get_object(self, data, many=False):
if many:
ids = [d['id'] for d in data]
return User.query.filter(User.id.in_(ids)).all()
else:
return User.query.filter(User.id == data['id']).one()
class PluginInputSchema(LemurInputSchema):
plugin_options = fields.List(fields.Dict())
slug = fields.String(required=True)
title = fields.String()
description = fields.String()
@post_load
def get_object(self, data, many=False):
data['plugin_object'] = plugins.get(data['slug'])
return data
class PluginOutputSchema(LemurOutputSchema):
id = fields.Integer()
label = fields.String()
description = fields.String()
active = fields.Boolean()
options = fields.List(fields.Dict(), dump_to='pluginOptions')
slug = fields.String()
title = fields.String()
plugins_output_schema = PluginOutputSchema(many=True)
plugin_output_schema = PluginOutputSchema
class BaseExtensionSchema(LemurSchema):
@pre_load(pass_many=True)
def preprocess(self, data, many):
return self.under(data, many=many)
@post_dump(pass_many=True)
def post_process(self, data, many):
if data:
data = self.camel(data, many=many)
return data
class BasicConstraintsSchema(BaseExtensionSchema):
pass
class AuthorityIdentifierSchema(BaseExtensionSchema):
use_authority_cert = fields.Boolean()
class AuthorityKeyIdentifierSchema(BaseExtensionSchema):
use_key_identifier = fields.Boolean()
class CertificateInfoAccessSchema(BaseExtensionSchema):
include_aia = fields.Boolean()
@post_dump
def handle_keys(self, data):
return {'includeAIA': data['include_aia']}
class KeyUsageSchema(BaseExtensionSchema):
use_crl_sign = fields.Boolean()
use_data_encipherment = fields.Boolean()
use_decipher_only = fields.Boolean()
use_encipher_only = fields.Boolean()
use_key_encipherment = fields.Boolean()
use_digital_signature = fields.Boolean()
use_non_repudiation = fields.Boolean()
class ExtendedKeyUsageSchema(BaseExtensionSchema):
use_server_authentication = fields.Boolean()
use_client_authentication = fields.Boolean()
use_eap_over_lan = fields.Boolean()
use_eap_over_ppp = fields.Boolean()
use_ocsp_signing = fields.Boolean()
use_smart_card_authentication = fields.Boolean()
use_timestamping = fields.Boolean()
class SubjectKeyIdentifierSchema(BaseExtensionSchema):
include_ski = fields.Boolean()
@post_dump
def handle_keys(self, data):
return {'includeSKI': data['include_ski']}
class SubAltNameSchema(BaseExtensionSchema):
name_type = fields.String(validate=validators.sub_alt_type)
value = fields.String()
@validates_schema
def check_sensitive(self, data):
if data['name_type'] == 'DNSName':
validators.sensitive_domain(data['value'])
class SubAltNamesSchema(BaseExtensionSchema):
names = fields.Nested(SubAltNameSchema, many=True)
class CustomOIDSchema(BaseExtensionSchema):
oid = fields.String()
oid_type = fields.String(validate=validators.oid_type)
value = fields.String()
class ExtensionSchema(BaseExtensionSchema):
basic_constraints = fields.Nested(BasicConstraintsSchema)
key_usage = fields.Nested(KeyUsageSchema)
extended_key_usage = fields.Nested(ExtendedKeyUsageSchema)
subject_key_identifier = fields.Nested(SubjectKeyIdentifierSchema)
sub_alt_names = fields.Nested(SubAltNamesSchema)
authority_identifier = fields.Nested(AuthorityIdentifierSchema)
authority_key_identifier = fields.Nested(AuthorityKeyIdentifierSchema)
certificate_info_access = fields.Nested(CertificateInfoAccessSchema)
custom = fields.List(fields.Nested(CustomOIDSchema))