tiramisu/tiramisu/option/option.py

571 lines
24 KiB
Python

# -*- coding: utf-8 -*-
"option types and option description"
# Copyright (C) 2012-2013 Team tiramisu (see AUTHORS for all contributors)
#
# This program is free software: you can redistribute it and/or modify it
# under the terms of the GNU Lesser General Public License as published by the
# Free Software Foundation, either version 3 of the License, or (at your
# option) any later version.
#
# This program is distributed in the hope that it will be useful, but WITHOUT
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
# FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
# details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
# The original `Config` design model is unproudly borrowed from
# the rough pypy's guys: http://codespeak.net/svn/pypy/dist/pypy/config/
# the whole pypy projet is under MIT licence
# ____________________________________________________________
import re
import sys
from IPy import IP
from types import FunctionType
from ..setting import log, undefined
from ..error import ConfigError, ContextError
from ..i18n import _
from .baseoption import Option, validate_callback
from ..autolib import carry_out_calculation
class ChoiceOption(Option):
"""represents a choice out of several objects.
The option can also have the value ``None``
"""
__slots__ = tuple()
def __init__(self, name, doc, values, default=None,
values_params=None, default_multi=None, requires=None,
multi=False, callback=None, callback_params=None,
validator=None, validator_params=None,
properties=None, warnings_only=False):
"""
:param values: is a list of values the option can possibly take
"""
if isinstance(values, FunctionType):
validate_callback(values, values_params, 'values')
else:
if values_params is not None:
raise ValueError(_('values is not a function, so values_params must be None'))
if not isinstance(values, tuple): # pragma: optional cover
raise TypeError(_('values must be a tuple or a function for {0}'
).format(name))
_setattr = object.__setattr__
_setattr(self, '_choice_values', values)
_setattr(self, '_choice_values_params', values_params)
super(ChoiceOption, self).__init__(name, doc, default=default,
default_multi=default_multi,
callback=callback,
callback_params=callback_params,
requires=requires,
multi=multi,
validator=validator,
validator_params=validator_params,
properties=properties,
warnings_only=warnings_only)
def impl_get_values(self, context, current_opt=undefined):
if current_opt is undefined:
current_opt = self
#FIXME cache? but in context...
values = self._choice_values
if isinstance(values, FunctionType):
if context is None:
values = []
else:
values_params = self._choice_values_params
if values_params is None:
values_params = {}
values = carry_out_calculation(current_opt, context=context,
callback=values,
callback_params=values_params)
if not isinstance(values, list): # pragma: optional cover
raise ConfigError(_('calculated values for {0} is not a list'
'').format(self.impl_getname()))
return values
def _validate(self, value, context=undefined, current_opt=undefined):
try:
values = self.impl_get_values(context, current_opt=current_opt)
if not value in values: # pragma: optional cover
raise ValueError(_('value {0} is not permitted, '
'only {1} is allowed'
'').format(value,
values))
except ContextError: # pragma: optional cover
log.debug('ChoiceOption validation, disabled because no context')
class BoolOption(Option):
"represents a choice between ``True`` and ``False``"
__slots__ = tuple()
def _validate(self, value, context=undefined, current_opt=undefined):
if not isinstance(value, bool):
raise ValueError(_('invalid boolean')) # pragma: optional cover
class IntOption(Option):
"represents a choice of an integer"
__slots__ = tuple()
def _validate(self, value, context=undefined, current_opt=undefined):
if not isinstance(value, int):
raise ValueError(_('invalid integer')) # pragma: optional cover
class FloatOption(Option):
"represents a choice of a floating point number"
__slots__ = tuple()
def _validate(self, value, context=undefined, current_opt=undefined):
if not isinstance(value, float):
raise ValueError(_('invalid float')) # pragma: optional cover
class StrOption(Option):
"represents the choice of a string"
__slots__ = tuple()
def _validate(self, value, context=undefined, current_opt=undefined):
if not isinstance(value, str):
raise ValueError(_('invalid string')) # pragma: optional cover
if sys.version_info[0] >= 3: # pragma: optional cover
#UnicodeOption is same as StrOption in python 3+
class UnicodeOption(StrOption):
__slots__ = tuple()
pass
else:
class UnicodeOption(Option):
"represents the choice of a unicode string"
__slots__ = tuple()
_empty = u''
def _validate(self, value, context=undefined, current_opt=undefined):
if not isinstance(value, unicode):
raise ValueError(_('invalid unicode')) # pragma: optional cover
class IPOption(Option):
"represents the choice of an ip"
__slots__ = tuple()
def __init__(self, name, doc, default=None, default_multi=None,
requires=None, multi=False, callback=None,
callback_params=None, validator=None, validator_params=None,
properties=None, private_only=False, allow_reserved=False,
warnings_only=False):
extra = {'_private_only': private_only,
'_allow_reserved': allow_reserved}
super(IPOption, self).__init__(name, doc, default=default,
default_multi=default_multi,
callback=callback,
callback_params=callback_params,
requires=requires,
multi=multi,
validator=validator,
validator_params=validator_params,
properties=properties,
warnings_only=warnings_only,
extra=extra)
def _validate(self, value, context=undefined, current_opt=undefined):
# sometimes an ip term starts with a zero
# but this does not fit in some case, for example bind does not like it
self._impl_valid_unicode(value)
for val in value.split('.'):
if val.startswith("0") and len(val) > 1:
raise ValueError(_('invalid IP')) # pragma: optional cover
# 'standard' validation
try:
IP('{0}/32'.format(value))
except ValueError: # pragma: optional cover
raise ValueError(_('invalid IP'))
def _second_level_validation(self, value, warnings_only):
ip = IP('{0}/32'.format(value))
if not self._get_extra('_allow_reserved') and ip.iptype() == 'RESERVED': # pragma: optional cover
if warnings_only:
msg = _("IP is in reserved class")
else:
msg = _("invalid IP, mustn't be in reserved class")
raise ValueError(msg)
if self._get_extra('_private_only') and not ip.iptype() == 'PRIVATE': # pragma: optional cover
if warnings_only:
msg = _("IP is not in private class")
else:
msg = _("invalid IP, must be in private class")
raise ValueError(msg)
def _cons_in_network(self, opts, vals, warnings_only):
if len(vals) != 3:
raise ConfigError(_('invalid len for vals')) # pragma: optional cover
if None in vals:
return
ip, network, netmask = vals
if IP(ip) not in IP('{0}/{1}'.format(network, netmask)): # pragma: optional cover
if warnings_only:
msg = _('IP {0} ({1}) not in network {2} ({3}) with netmask {4}'
' ({5})')
else:
msg = _('invalid IP {0} ({1}) not in network {2} ({3}) with '
'netmask {4} ({5})')
raise ValueError(msg.format(ip, opts[0].impl_getname(), network,
opts[1].impl_getname(), netmask, opts[2].impl_getname()))
# test if ip is not network/broadcast IP
opts[2]._cons_ip_netmask((opts[2], opts[0]), (netmask, ip), warnings_only)
class PortOption(Option):
"""represents the choice of a port
The port numbers are divided into three ranges:
the well-known ports,
the registered ports,
and the dynamic or private ports.
You can actived this three range.
Port number 0 is reserved and can't be used.
see: http://en.wikipedia.org/wiki/Port_numbers
"""
__slots__ = tuple()
def __init__(self, name, doc, default=None, default_multi=None,
requires=None, multi=False, callback=None,
callback_params=None, validator=None, validator_params=None,
properties=None, allow_range=False, allow_zero=False,
allow_wellknown=True, allow_registred=True,
allow_private=False, warnings_only=False):
extra = {'_allow_range': allow_range,
'_min_value': None,
'_max_value': None}
ports_min = [0, 1, 1024, 49152]
ports_max = [0, 1023, 49151, 65535]
is_finally = False
for index, allowed in enumerate([allow_zero,
allow_wellknown,
allow_registred,
allow_private]):
if extra['_min_value'] is None:
if allowed:
extra['_min_value'] = ports_min[index]
elif not allowed:
is_finally = True
elif allowed and is_finally:
raise ValueError(_('inconsistency in allowed range')) # pragma: optional cover
if allowed:
extra['_max_value'] = ports_max[index]
if extra['_max_value'] is None:
raise ValueError(_('max value is empty')) # pragma: optional cover
super(PortOption, self).__init__(name, doc, default=default,
default_multi=default_multi,
callback=callback,
callback_params=callback_params,
requires=requires,
multi=multi,
validator=validator,
validator_params=validator_params,
properties=properties,
warnings_only=warnings_only,
extra=extra)
def _validate(self, value, context=undefined, current_opt=undefined):
if isinstance(value, int):
value = unicode(value)
self._impl_valid_unicode(value)
if self._get_extra('_allow_range') and ":" in str(value): # pragma: optional cover
value = str(value).split(':')
if len(value) != 2:
raise ValueError(_('invalid port, range must have two values '
'only'))
if not value[0] < value[1]:
raise ValueError(_('invalid port, first port in range must be'
' smaller than the second one'))
else:
value = [value]
for val in value:
try:
val = int(val)
except ValueError: # pragma: optional cover
raise ValueError(_('invalid port'))
if not self._get_extra('_min_value') <= val <= self._get_extra('_max_value'): # pragma: optional cover
raise ValueError(_('invalid port, must be an integer between {0} '
'and {1}').format(self._get_extra('_min_value'),
self._get_extra('_max_value')))
class NetworkOption(Option):
"represents the choice of a network"
__slots__ = tuple()
def _validate(self, value, context=undefined, current_opt=undefined):
self._impl_valid_unicode(value)
try:
IP(value)
except ValueError: # pragma: optional cover
raise ValueError(_('invalid network address'))
def _second_level_validation(self, value, warnings_only):
ip = IP(value)
if ip.iptype() == 'RESERVED': # pragma: optional cover
if warnings_only:
msg = _("network address is in reserved class")
else:
msg = _("invalid network address, mustn't be in reserved class")
raise ValueError(msg)
class NetmaskOption(Option):
"represents the choice of a netmask"
__slots__ = tuple()
def _validate(self, value, context=undefined, current_opt=undefined):
self._impl_valid_unicode(value)
try:
IP('0.0.0.0/{0}'.format(value))
except ValueError: # pragma: optional cover
raise ValueError(_('invalid netmask address'))
def _cons_network_netmask(self, opts, vals, warnings_only):
#opts must be (netmask, network) options
if None in vals:
return
self.__cons_netmask(opts, vals[0], vals[1], False, warnings_only)
def _cons_ip_netmask(self, opts, vals, warnings_only):
#opts must be (netmask, ip) options
if None in vals:
return
self.__cons_netmask(opts, vals[0], vals[1], True, warnings_only)
def __cons_netmask(self, opts, val_netmask, val_ipnetwork, make_net,
warnings_only):
if len(opts) != 2:
raise ConfigError(_('invalid len for opts')) # pragma: optional cover
msg = None
try:
ip = IP('{0}/{1}'.format(val_ipnetwork, val_netmask),
make_net=make_net)
#if cidr == 32, ip same has network
if make_net and ip.prefixlen() != 32:
val_ip = IP(val_ipnetwork)
if ip.net() == val_ip:
msg = _("invalid IP {0} ({1}) with netmask {2},"
" this IP is a network")
if ip.broadcast() == val_ip:
msg = _("invalid IP {0} ({1}) with netmask {2},"
" this IP is a broadcast")
except ValueError: # pragma: optional cover
if not make_net:
msg = _('invalid network {0} ({1}) with netmask {2}')
if msg is not None: # pragma: optional cover
raise ValueError(msg.format(val_ipnetwork, opts[1].impl_getname(),
val_netmask))
class BroadcastOption(Option):
__slots__ = tuple()
def _validate(self, value, context=undefined, current_opt=undefined):
self._impl_valid_unicode(value)
try:
IP('{0}/32'.format(value))
except ValueError: # pragma: optional cover
raise ValueError(_('invalid broadcast address'))
def _cons_broadcast(self, opts, vals, warnings_only):
if len(vals) != 3:
raise ConfigError(_('invalid len for vals')) # pragma: optional cover
if None in vals:
return
broadcast, network, netmask = vals
if IP('{0}/{1}'.format(network, netmask)).broadcast() != IP(broadcast):
raise ValueError(_('invalid broadcast {0} ({1}) with network {2} '
'({3}) and netmask {4} ({5})').format(
broadcast, opts[0].impl_getname(), network,
opts[1].impl_getname(), netmask, opts[2].impl_getname())) # pragma: optional cover
class DomainnameOption(Option):
"""represents the choice of a domain name
netbios: for MS domain
hostname: to identify the device
domainname:
fqdn: with tld, not supported yet
"""
__slots__ = tuple()
def __init__(self, name, doc, default=None, default_multi=None,
requires=None, multi=False, callback=None,
callback_params=None, validator=None, validator_params=None,
properties=None, allow_ip=False, type_='domainname',
warnings_only=False, allow_without_dot=False):
if type_ not in ['netbios', 'hostname', 'domainname']:
raise ValueError(_('unknown type_ {0} for hostname').format(type_)) # pragma: optional cover
extra = {'_dom_type': type_}
if allow_ip not in [True, False]:
raise ValueError(_('allow_ip must be a boolean')) # pragma: optional cover
if allow_without_dot not in [True, False]:
raise ValueError(_('allow_without_dot must be a boolean')) # pragma: optional cover
extra['_allow_ip'] = allow_ip
extra['_allow_without_dot'] = allow_without_dot
extra['_domain_re'] = re.compile(r'^[a-z\d][a-z\d\-]*$')
extra['_has_upper'] = re.compile('[A-Z]')
super(DomainnameOption, self).__init__(name, doc, default=default,
default_multi=default_multi,
callback=callback,
callback_params=callback_params,
requires=requires,
multi=multi,
validator=validator,
validator_params=validator_params,
properties=properties,
warnings_only=warnings_only,
extra=extra)
def _validate(self, value, context=undefined, current_opt=undefined):
self._impl_valid_unicode(value)
def _valid_length(val):
if len(val) < 1:
raise ValueError(_("invalid domainname's length (min 1)"))
if len(val) > part_name_length:
raise ValueError(_("invalid domainname's length (max {0})"
"").format(part_name_length))
if self._get_extra('_allow_ip') is True: # pragma: optional cover
try:
IP('{0}/32'.format(value))
return
except ValueError:
pass
if self._get_extra('_dom_type') == 'netbios':
part_name_length = 15
else:
part_name_length = 63
if self._get_extra('_dom_type') == 'domainname':
if not self._get_extra('_allow_without_dot') and not "." in value:
raise ValueError(_("invalid domainname, must have dot"))
if len(value) > 255:
raise ValueError(_("invalid domainname's length (max 255)"))
for dom in value.split('.'):
_valid_length(dom)
else:
_valid_length(value)
def _second_level_validation(self, value, warnings_only):
def _valid_char(val):
if self._get_extra('_has_upper').search(val):
raise ValueError(_('some characters are uppercase'))
if not self._get_extra('_domain_re').search(val):
if warnings_only:
raise ValueError(_('some characters may cause problems'))
else:
raise ValueError(_('invalid domainname'))
#not for IP
if self._get_extra('_allow_ip') is True:
try:
IP('{0}/32'.format(value))
return
except ValueError:
pass
if self._get_extra('_dom_type') == 'domainname':
for dom in value.split('.'):
_valid_char(dom)
else:
_valid_char(value)
class EmailOption(DomainnameOption):
__slots__ = tuple()
username_re = re.compile(r"^[\w!#$%&'*+\-/=?^`{|}~.]+$")
def _validate(self, value, context=undefined, current_opt=undefined):
self._impl_valid_unicode(value)
splitted = value.split('@', 1)
try:
username, domain = splitted
except ValueError: # pragma: optional cover
raise ValueError(_('invalid email address, must contains one @'
))
if not self.username_re.search(username):
raise ValueError(_('invalid username in email address')) # pragma: optional cover
super(EmailOption, self)._validate(domain)
super(EmailOption, self)._second_level_validation(domain, False)
def _second_level_validation(self, value, warnings_only):
pass
class URLOption(DomainnameOption):
__slots__ = tuple()
proto_re = re.compile(r'(http|https)://')
path_re = re.compile(r"^[A-Za-z0-9\-\._~:/\?#\[\]@!%\$&\'\(\)\*\+,;=]+$")
def _validate(self, value, context=undefined, current_opt=undefined):
self._impl_valid_unicode(value)
match = self.proto_re.search(value)
if not match: # pragma: optional cover
raise ValueError(_('invalid url, must start with http:// or '
'https://'))
value = value[len(match.group(0)):]
# get domain/files
splitted = value.split('/', 1)
try:
domain, files = splitted
except ValueError:
domain = value
files = None
# if port in domain
splitted = domain.split(':', 1)
try:
domain, port = splitted
except ValueError: # pragma: optional cover
domain = splitted[0]
port = 0
if not 0 <= int(port) <= 65535:
raise ValueError(_('invalid url, port must be an between 0 and '
'65536')) # pragma: optional cover
# validate domainname
super(URLOption, self)._validate(domain)
super(URLOption, self)._second_level_validation(domain, False)
# validate file
if files is not None and files != '' and not self.path_re.search(files):
raise ValueError(_('invalid url, must ends with a valid resource name')) # pragma: optional cover
def _second_level_validation(self, value, warnings_only):
pass
class UsernameOption(Option):
__slots__ = tuple()
#regexp build with 'man 8 adduser' informations
username_re = re.compile(r"^[a-z_][a-z0-9_-]{0,30}[$a-z0-9_-]{0,1}$")
def _validate(self, value, context=undefined, current_opt=undefined):
self._impl_valid_unicode(value)
match = self.username_re.search(value)
if not match:
raise ValueError(_('invalid username')) # pragma: optional cover
class FilenameOption(Option):
__slots__ = tuple()
path_re = re.compile(r"^[a-zA-Z0-9\-\._~/+]+$")
def _validate(self, value, context=undefined, current_opt=undefined):
self._impl_valid_unicode(value)
match = self.path_re.search(value)
if not match:
raise ValueError(_('invalid filename')) # pragma: optional cover