lemur/lemur/schemas.py

246 lines
7.2 KiB
Python
Raw Normal View History

"""
.. module: lemur.schemas
:platform: unix
:copyright: (c) 2015 by Netflix Inc., see AUTHORS for more
:license: Apache, see LICENSE for more details.
.. moduleauthor:: Kevin Glisson <kglisson@netflix.com>
"""
2016-07-04 23:32:46 +02:00
from sqlalchemy.orm.exc import NoResultFound
from marshmallow import fields, post_load, pre_load, post_dump, validates_schema
2016-07-04 23:32:46 +02:00
from marshmallow.exceptions import ValidationError
from lemur.common import validators
2016-05-12 21:38:44 +02:00
from lemur.common.schema import LemurSchema, LemurInputSchema, LemurOutputSchema
2016-07-04 23:32:46 +02:00
from lemur.plugins import plugins
from lemur.roles.models import Role
from lemur.users.models import User
2016-07-04 23:32:46 +02:00
from lemur.authorities.models import Authority
from lemur.certificates.models import Certificate
from lemur.destinations.models import Destination
from lemur.notifications.models import Notification
def fetch_object(model, field, value):
try:
return model.query.filter(getattr(model, field) == value).one()
except NoResultFound:
raise ValidationError('Unable to find {model} with {field}: {data}'.format(model=model, field=field, data=value))
def fetch_objects(model, field, values):
values = [v[field] for v in values]
items = model.query.filter(getattr(model, field).in_(values)).all()
found = [getattr(i, field) for i in items]
diff = set(values).symmetric_difference(set(found))
if diff:
raise ValidationError('Unable to locate {model} with {field} {diff}'.format(model=model, field=field, diff=",".join([list(diff)])))
return items
class AssociatedAuthoritySchema(LemurInputSchema):
id = fields.Int()
name = fields.String()
@post_load
def get_object(self, data, many=False):
if data.get('id'):
2016-07-04 23:32:46 +02:00
return fetch_object(Authority, 'id', data['id'])
elif data.get('name'):
2016-07-04 23:32:46 +02:00
return fetch_object(Authority, 'name', data['name'])
class AssociatedRoleSchema(LemurInputSchema):
id = fields.Int(required=True)
name = fields.String()
@post_load
def get_object(self, data, many=False):
if many:
2016-07-04 23:32:46 +02:00
return fetch_objects(Role, 'id', data)
else:
2016-07-04 23:32:46 +02:00
return fetch_object(Role, 'id', data['id'])
class AssociatedDestinationSchema(LemurInputSchema):
id = fields.Int(required=True)
name = fields.String()
@post_load
def get_object(self, data, many=False):
if many:
2016-07-04 23:32:46 +02:00
return fetch_objects(Destination, 'id', data)
else:
2016-07-04 23:32:46 +02:00
return fetch_object(Destination, 'id', data['id'])
class AssociatedNotificationSchema(LemurInputSchema):
id = fields.Int(required=True)
@post_load
def get_object(self, data, many=False):
if many:
2016-07-04 23:32:46 +02:00
return fetch_objects(Notification, 'id', data)
else:
2016-07-04 23:32:46 +02:00
return fetch_object(Notification, 'id', data['id'])
class AssociatedCertificateSchema(LemurInputSchema):
id = fields.Int(required=True)
@post_load
def get_object(self, data, many=False):
if many:
2016-07-04 23:32:46 +02:00
return fetch_objects(Certificate, 'id', data)
else:
2016-07-04 23:32:46 +02:00
return fetch_object(Certificate, 'id', data['id'])
2016-05-10 23:22:22 +02:00
class AssociatedUserSchema(LemurInputSchema):
id = fields.Int(required=True)
@post_load
def get_object(self, data, many=False):
if many:
2016-07-04 23:32:46 +02:00
return fetch_objects(User, 'id', data)
2016-05-10 23:22:22 +02:00
else:
2016-07-04 23:32:46 +02:00
return fetch_object(User, 'id', data['id'])
2016-05-10 23:22:22 +02:00
2016-05-12 21:38:44 +02:00
class PluginInputSchema(LemurInputSchema):
plugin_options = fields.List(fields.Dict())
slug = fields.String(required=True)
2016-05-10 20:27:57 +02:00
title = fields.String()
description = fields.String()
@post_load
def get_object(self, data, many=False):
2016-07-04 23:32:46 +02:00
try:
data['plugin_object'] = plugins.get(data['slug'])
return data
except Exception:
raise ValidationError('Unable to find plugin: {0}'.format(data['slug']))
2016-05-12 21:38:44 +02:00
class PluginOutputSchema(LemurOutputSchema):
id = fields.Integer()
label = fields.String()
description = fields.String()
active = fields.Boolean()
2016-05-16 18:23:48 +02:00
options = fields.List(fields.Dict(), dump_to='pluginOptions')
2016-05-12 21:38:44 +02:00
slug = fields.String()
title = fields.String()
2016-05-13 23:35:38 +02:00
plugins_output_schema = PluginOutputSchema(many=True)
plugin_output_schema = PluginOutputSchema
class BaseExtensionSchema(LemurSchema):
@pre_load(pass_many=True)
def preprocess(self, data, many):
return self.under(data, many=many)
@post_dump(pass_many=True)
def post_process(self, data, many):
if data:
data = self.camel(data, many=many)
return data
class BasicConstraintsSchema(BaseExtensionSchema):
pass
class AuthorityIdentifierSchema(BaseExtensionSchema):
use_authority_cert = fields.Boolean()
class AuthorityKeyIdentifierSchema(BaseExtensionSchema):
use_key_identifier = fields.Boolean()
class CertificateInfoAccessSchema(BaseExtensionSchema):
include_aia = fields.Boolean()
@post_dump
def handle_keys(self, data):
return {'includeAIA': data['include_aia']}
class KeyUsageSchema(BaseExtensionSchema):
use_crl_sign = fields.Boolean()
use_data_encipherment = fields.Boolean()
use_decipher_only = fields.Boolean()
use_encipher_only = fields.Boolean()
use_key_encipherment = fields.Boolean()
use_digital_signature = fields.Boolean()
use_non_repudiation = fields.Boolean()
class ExtendedKeyUsageSchema(BaseExtensionSchema):
use_server_authentication = fields.Boolean()
use_client_authentication = fields.Boolean()
use_eap_over_lan = fields.Boolean()
use_eap_over_ppp = fields.Boolean()
use_ocsp_signing = fields.Boolean()
use_smart_card_authentication = fields.Boolean()
use_timestamping = fields.Boolean()
class SubjectKeyIdentifierSchema(BaseExtensionSchema):
include_ski = fields.Boolean()
@post_dump
def handle_keys(self, data):
return {'includeSKI': data['include_ski']}
class SubAltNameSchema(BaseExtensionSchema):
name_type = fields.String(validate=validators.sub_alt_type)
value = fields.String()
@validates_schema
def check_sensitive(self, data):
if data['name_type'] == 'DNSName':
validators.sensitive_domain(data['value'])
class SubAltNamesSchema(BaseExtensionSchema):
names = fields.Nested(SubAltNameSchema, many=True)
class CustomOIDSchema(BaseExtensionSchema):
oid = fields.String()
2016-06-09 01:41:31 +02:00
encoding = fields.String(validate=validators.encoding)
value = fields.String()
class ExtensionSchema(BaseExtensionSchema):
basic_constraints = fields.Nested(BasicConstraintsSchema)
key_usage = fields.Nested(KeyUsageSchema)
extended_key_usage = fields.Nested(ExtendedKeyUsageSchema)
subject_key_identifier = fields.Nested(SubjectKeyIdentifierSchema)
sub_alt_names = fields.Nested(SubAltNamesSchema)
authority_identifier = fields.Nested(AuthorityIdentifierSchema)
authority_key_identifier = fields.Nested(AuthorityKeyIdentifierSchema)
certificate_info_access = fields.Nested(CertificateInfoAccessSchema)
custom = fields.List(fields.Nested(CustomOIDSchema))
2016-06-27 23:40:46 +02:00
class EndpointNestedOutputSchema(LemurOutputSchema):
__envelope__ = False
id = fields.Integer()
description = fields.String()
name = fields.String()
dnsname = fields.String()
owner = fields.Email()
type = fields.String()
active = fields.Boolean()