lemur/lemur/schemas.py

298 lines
8.4 KiB
Python
Raw Normal View History

"""
.. module: lemur.schemas
:platform: unix
:copyright: (c) 2018 by Netflix Inc., see AUTHORS for more
:license: Apache, see LICENSE for more details.
.. moduleauthor:: Kevin Glisson <kglisson@netflix.com>
"""
2016-07-04 23:32:46 +02:00
from sqlalchemy.orm.exc import NoResultFound
from marshmallow import fields, post_load, pre_load, post_dump
2016-07-04 23:32:46 +02:00
from marshmallow.exceptions import ValidationError
from lemur.common import validators
2016-05-12 21:38:44 +02:00
from lemur.common.schema import LemurSchema, LemurInputSchema, LemurOutputSchema
2019-05-16 16:57:02 +02:00
from lemur.common.fields import (
KeyUsageExtension,
ExtendedKeyUsageExtension,
BasicConstraintsExtension,
SubjectAlternativeNameExtension,
)
2016-07-04 23:32:46 +02:00
from lemur.plugins import plugins
from lemur.plugins.utils import get_plugin_option
from lemur.roles.models import Role
from lemur.users.models import User
2016-07-04 23:32:46 +02:00
from lemur.authorities.models import Authority
from lemur.dns_providers.models import DnsProvider
from lemur.policies.models import RotationPolicy
2016-07-04 23:32:46 +02:00
from lemur.certificates.models import Certificate
from lemur.destinations.models import Destination
from lemur.notifications.models import Notification
def validate_options(options):
"""
Ensures that the plugin options are valid.
:param options:
:return:
"""
2019-05-16 16:57:02 +02:00
interval = get_plugin_option("interval", options)
unit = get_plugin_option("unit", options)
if not interval and not unit:
return
2019-05-16 16:57:02 +02:00
if unit == "month":
interval *= 30
2019-05-16 16:57:02 +02:00
elif unit == "week":
interval *= 7
if interval > 90:
2019-05-16 16:57:02 +02:00
raise ValidationError(
"Notification cannot be more than 90 days into the future."
)
def get_object_attribute(data, many=False):
if many:
2019-05-16 16:57:02 +02:00
ids = [d.get("id") for d in data]
names = [d.get("name") for d in data]
if None in ids:
if None in names:
2019-05-16 16:57:02 +02:00
raise ValidationError("Associated object require a name or id.")
else:
2019-05-16 16:57:02 +02:00
return "name"
return "id"
else:
2019-05-16 16:57:02 +02:00
if data.get("id"):
return "id"
elif data.get("name"):
return "name"
else:
2019-05-16 16:57:02 +02:00
raise ValidationError("Associated object require a name or id.")
2016-07-04 23:32:46 +02:00
def fetch_objects(model, data, many=False):
attr = get_object_attribute(data, many=many)
2016-07-04 23:32:46 +02:00
if many:
values = [v[attr] for v in data]
items = model.query.filter(getattr(model, attr).in_(values)).all()
found = [getattr(i, attr) for i in items]
diff = set(values).symmetric_difference(set(found))
2016-07-04 23:32:46 +02:00
if diff:
2019-05-16 16:57:02 +02:00
raise ValidationError(
"Unable to locate {model} with {attr} {diff}".format(
model=model, attr=attr, diff=",".join(list(diff))
)
)
return items
else:
try:
return model.query.filter(getattr(model, attr) == data[attr]).one()
except NoResultFound:
2019-05-16 16:57:02 +02:00
raise ValidationError(
"Unable to find {model} with {attr}: {data}".format(
model=model, attr=attr, data=data[attr]
)
)
class AssociatedAuthoritySchema(LemurInputSchema):
id = fields.Int()
name = fields.String()
@post_load
def get_object(self, data, many=False):
return fetch_objects(Authority, data, many=many)
class AssociatedDnsProviderSchema(LemurInputSchema):
id = fields.Int()
name = fields.String()
@post_load
def get_object(self, data, many=False):
return fetch_objects(DnsProvider, data, many=many)
class AssociatedRoleSchema(LemurInputSchema):
id = fields.Int()
name = fields.String()
@post_load
def get_object(self, data, many=False):
return fetch_objects(Role, data, many=many)
class AssociatedDestinationSchema(LemurInputSchema):
id = fields.Int()
name = fields.String()
@post_load
def get_object(self, data, many=False):
return fetch_objects(Destination, data, many=many)
class AssociatedNotificationSchema(LemurInputSchema):
id = fields.Int()
name = fields.String()
@post_load
def get_object(self, data, many=False):
return fetch_objects(Notification, data, many=many)
class AssociatedCertificateSchema(LemurInputSchema):
id = fields.Int()
name = fields.String()
@post_load
def get_object(self, data, many=False):
return fetch_objects(Certificate, data, many=many)
2016-05-10 23:22:22 +02:00
class AssociatedUserSchema(LemurInputSchema):
id = fields.Int()
name = fields.String()
2016-05-10 23:22:22 +02:00
@post_load
def get_object(self, data, many=False):
return fetch_objects(User, data, many=many)
2016-05-10 23:22:22 +02:00
class AssociatedRotationPolicySchema(LemurInputSchema):
id = fields.Int()
name = fields.String()
@post_load
def get_object(self, data, many=False):
return fetch_objects(RotationPolicy, data, many=many)
2016-05-12 21:38:44 +02:00
class PluginInputSchema(LemurInputSchema):
plugin_options = fields.List(fields.Dict(), validate=validate_options)
2016-05-12 21:38:44 +02:00
slug = fields.String(required=True)
2016-05-10 20:27:57 +02:00
title = fields.String()
description = fields.String()
@post_load
def get_object(self, data, many=False):
2016-07-04 23:32:46 +02:00
try:
2019-05-16 16:57:02 +02:00
data["plugin_object"] = plugins.get(data["slug"])
# parse any sub-plugins
2019-05-16 16:57:02 +02:00
for option in data.get("plugin_options", []):
if "plugin" in option.get("type", []):
sub_data, errors = PluginInputSchema().load(option["value"])
option["value"] = sub_data
2016-07-04 23:32:46 +02:00
return data
except Exception as e:
2019-05-16 16:57:02 +02:00
raise ValidationError(
"Unable to find plugin. Slug: {0} Reason: {1}".format(data["slug"], e)
)
2016-05-12 21:38:44 +02:00
class PluginOutputSchema(LemurOutputSchema):
id = fields.Integer()
label = fields.String()
description = fields.String()
active = fields.Boolean()
2019-05-16 16:57:02 +02:00
options = fields.List(fields.Dict(), dump_to="pluginOptions")
2016-05-12 21:38:44 +02:00
slug = fields.String()
title = fields.String()
2016-05-13 23:35:38 +02:00
plugins_output_schema = PluginOutputSchema(many=True)
plugin_output_schema = PluginOutputSchema
class BaseExtensionSchema(LemurSchema):
@pre_load(pass_many=True)
def preprocess(self, data, many):
return self.under(data, many=many)
@post_dump(pass_many=True)
def post_process(self, data, many):
if data:
data = self.camel(data, many=many)
return data
class AuthorityKeyIdentifierSchema(BaseExtensionSchema):
use_key_identifier = fields.Boolean()
use_authority_cert = fields.Boolean()
class CertificateInfoAccessSchema(BaseExtensionSchema):
include_aia = fields.Boolean()
@post_dump
def handle_keys(self, data):
2019-05-16 16:57:02 +02:00
return {"includeAIA": data["include_aia"]}
class CRLDistributionPointsSchema(BaseExtensionSchema):
include_crl_dp = fields.String()
@post_dump
def handle_keys(self, data):
2019-05-16 16:57:02 +02:00
return {"includeCRLDP": data["include_crl_dp"]}
class SubjectKeyIdentifierSchema(BaseExtensionSchema):
include_ski = fields.Boolean()
@post_dump
def handle_keys(self, data):
2019-05-16 16:57:02 +02:00
return {"includeSKI": data["include_ski"]}
class CustomOIDSchema(BaseExtensionSchema):
oid = fields.String()
2016-06-09 01:41:31 +02:00
encoding = fields.String(validate=validators.encoding)
value = fields.String()
is_critical = fields.Boolean()
class NamesSchema(BaseExtensionSchema):
names = SubjectAlternativeNameExtension()
class ExtensionSchema(BaseExtensionSchema):
2019-05-16 16:57:02 +02:00
basic_constraints = (
BasicConstraintsExtension()
) # some devices balk on default basic constraints
key_usage = KeyUsageExtension()
extended_key_usage = ExtendedKeyUsageExtension()
subject_key_identifier = fields.Nested(SubjectKeyIdentifierSchema)
sub_alt_names = fields.Nested(NamesSchema)
authority_key_identifier = fields.Nested(AuthorityKeyIdentifierSchema)
certificate_info_access = fields.Nested(CertificateInfoAccessSchema)
2019-05-16 16:57:02 +02:00
crl_distribution_points = fields.Nested(
CRLDistributionPointsSchema, dump_to="cRL_distribution_points"
)
# FIXME: Convert custom OIDs to a custom field in fields.py like other Extensions
# FIXME: Remove support in UI for Critical custom extensions https://github.com/Netflix/lemur/issues/665
custom = fields.List(fields.Nested(CustomOIDSchema))
2016-06-27 23:40:46 +02:00
class EndpointNestedOutputSchema(LemurOutputSchema):
__envelope__ = False
id = fields.Integer()
description = fields.String()
name = fields.String()
dnsname = fields.String()
owner = fields.Email()
type = fields.String()
active = fields.Boolean()