tiramisu/tiramisu/option.py

1398 lines
57 KiB
Python

# -*- coding: utf-8 -*-
"option types and option description"
# Copyright (C) 2012-2013 Team tiramisu (see AUTHORS for all contributors)
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
#
# The original `Config` design model is unproudly borrowed from
# the rough pypy's guys: http://codespeak.net/svn/pypy/dist/pypy/config/
# the whole pypy projet is under MIT licence
# ____________________________________________________________
import re
import sys
from copy import copy, deepcopy
from types import FunctionType
from IPy import IP
import warnings
from tiramisu.error import ConfigError, ConflictError, ValueWarning
from tiramisu.setting import groups, multitypes
from tiramisu.i18n import _
from tiramisu.autolib import carry_out_calculation
name_regexp = re.compile(r'^\d+')
forbidden_names = ('iter_all', 'iter_group', 'find', 'find_first',
'make_dict', 'unwrap_from_path', 'read_only',
'read_write', 'getowner', 'set_contexts')
def valid_name(name):
"an option's name is a str and does not start with 'impl' or 'cfgimpl'"
try:
name = str(name)
except:
return False
if re.match(name_regexp, name) is None and not name.startswith('_') \
and name not in forbidden_names \
and not name.startswith('impl_') \
and not name.startswith('cfgimpl_'):
return True
else:
return False
#____________________________________________________________
#
class BaseOption(object):
"""This abstract base class stands for attribute access
in options that have to be set only once, it is of course done in the
__setattr__ method
"""
__slots__ = ('_name', '_requires', '_properties', '_readonly',
'_consistencies', '_calc_properties', '_impl_informations',
'_state_consistencies', '_state_readonly', '_state_requires',
'_stated')
def __init__(self, name, doc, requires, properties):
if not valid_name(name):
raise ValueError(_("invalid name: {0} for option").format(name))
self._name = name
self._impl_informations = {}
self.impl_set_information('doc', doc)
self._calc_properties, self._requires = validate_requires_arg(
requires, self._name)
self._consistencies = None
if properties is None:
properties = tuple()
if not isinstance(properties, tuple):
raise TypeError(_('invalid properties type {0} for {1},'
' must be a tuple').format(
type(properties),
self._name))
if self._calc_properties is not None and properties is not tuple():
set_forbidden_properties = set(properties) & self._calc_properties
if set_forbidden_properties != frozenset():
raise ValueError('conflict: properties already set in '
'requirement {0}'.format(
list(set_forbidden_properties)))
self._properties = properties # 'hidden', 'disabled'...
def __setattr__(self, name, value):
"""set once and only once some attributes in the option,
like `_name`. `_name` cannot be changed one the option and
pushed in the :class:`tiramisu.option.OptionDescription`.
if the attribute `_readonly` is set to `True`, the option is
"frozen" (which has noting to do with the high level "freeze"
propertie or "read_only" property)
"""
if not name.startswith('_state') and name not in ('_cache_paths',
'_consistencies'):
is_readonly = False
# never change _name
if name == '_name':
try:
self._name
#so _name is already set
is_readonly = True
except:
pass
try:
if self._readonly is True:
if value is True:
# already readonly and try to re set readonly
# don't raise, just exit
return
is_readonly = True
except AttributeError:
pass
if is_readonly:
raise AttributeError(_("'{0}' ({1}) object attribute '{2}' is"
" read-only").format(
self.__class__.__name__,
self._name,
name))
object.__setattr__(self, name, value)
# information
def impl_set_information(self, key, value):
"""updates the information's attribute
(which is a dictionary)
:param key: information's key (ex: "help", "doc"
:param value: information's value (ex: "the help string")
"""
self._impl_informations[key] = value
def impl_get_information(self, key, default=None):
"""retrieves one information's item
:param key: the item string (ex: "help")
"""
if key in self._impl_informations:
return self._impl_informations[key]
elif default is not None:
return default
else:
raise ValueError(_("information's item not found: {0}").format(
key))
# serialize/unserialize
def _impl_convert_consistencies(self, descr, load=False):
"""during serialization process, many things have to be done.
one of them is the localisation of the options.
The paths are set once for all.
:type descr: :class:`tiramisu.option.OptionDescription`
:param load: `True` if we are at the init of the option description
:type load: bool
"""
if not load and self._consistencies is None:
self._state_consistencies = None
elif load and self._state_consistencies is None:
self._consistencies = None
del(self._state_consistencies)
else:
if load:
consistencies = self._state_consistencies
else:
consistencies = self._consistencies
if isinstance(consistencies, list):
new_value = []
for consistency in consistencies:
values = []
for obj in consistency[1]:
if load:
values.append(descr.impl_get_opt_by_path(obj))
else:
values.append(descr.impl_get_path_by_opt(obj))
new_value.append((consistency[0], tuple(values)))
else:
new_value = {}
for key, _consistencies in consistencies.items():
new_value[key] = []
for key_cons, _cons in _consistencies:
_list_cons = []
for _con in _cons:
if load:
_list_cons.append(
descr.impl_get_opt_by_path(_con))
else:
_list_cons.append(
descr.impl_get_path_by_opt(_con))
new_value[key].append((key_cons, tuple(_list_cons)))
if load:
del(self._state_consistencies)
self._consistencies = new_value
else:
self._state_consistencies = new_value
def _impl_convert_requires(self, descr, load=False):
"""export of the requires during the serialization process
:type descr: :class:`tiramisu.option.OptionDescription`
:param load: `True` if we are at the init of the option description
:type load: bool
"""
if not load and self._requires is None:
self._state_requires = None
elif load and self._state_requires is None:
self._requires = None
del(self._state_requires)
else:
if load:
_requires = self._state_requires
else:
_requires = self._requires
new_value = []
for requires in _requires:
new_requires = []
for require in requires:
if load:
new_require = [descr.impl_get_opt_by_path(require[0])]
else:
new_require = [descr.impl_get_path_by_opt(require[0])]
new_require.extend(require[1:])
new_requires.append(tuple(new_require))
new_value.append(tuple(new_requires))
if load:
del(self._state_requires)
self._requires = new_value
else:
self._state_requires = new_value
# serialize
def _impl_getstate(self, descr):
"""the under the hood stuff that need to be done
before the serialization.
:param descr: the parent :class:`tiramisu.option.OptionDescription`
"""
self._stated = True
for func in dir(self):
if func.startswith('_impl_convert_'):
getattr(self, func)(descr)
try:
self._state_readonly = self._readonly
except AttributeError:
pass
def __getstate__(self, stated=True):
"""special method to enable the serialization with pickle
Usualy, a `__getstate__` method does'nt need any parameter,
but somme under the hood stuff need to be done before this action
:parameter stated: if stated is `True`, the serialization protocol
can be performed, not ready yet otherwise
:parameter type: bool
"""
try:
self._stated
except AttributeError:
raise SystemError(_('cannot serialize Option, '
'only in OptionDescription'))
slots = set()
for subclass in self.__class__.__mro__:
if subclass is not object:
slots.update(subclass.__slots__)
slots -= frozenset(['_cache_paths', '__weakref__'])
states = {}
for slot in slots:
# remove variable if save variable converted
# in _state_xxxx variable
if '_state' + slot not in slots:
if slot.startswith('_state'):
# should exists
states[slot] = getattr(self, slot)
# remove _state_xxx variable
self.__delattr__(slot)
else:
try:
states[slot] = getattr(self, slot)
except AttributeError:
pass
if not stated:
del(states['_stated'])
return states
# unserialize
def _impl_setstate(self, descr):
"""the under the hood stuff that need to be done
before the serialization.
:type descr: :class:`tiramisu.option.OptionDescription`
"""
for func in dir(self):
if func.startswith('_impl_convert_'):
getattr(self, func)(descr, load=True)
try:
self._readonly = self._state_readonly
del(self._state_readonly)
del(self._stated)
except AttributeError:
pass
def __setstate__(self, state):
"""special method that enables us to serialize (pickle)
Usualy, a `__setstate__` method does'nt need any parameter,
but somme under the hood stuff need to be done before this action
:parameter state: a dict is passed to the loads, it is the attributes
of the options object
:type state: dict
"""
for key, value in state.items():
setattr(self, key, value)
class Option(BaseOption):
"""
Abstract base class for configuration option's.
Reminder: an Option object is **not** a container for the value.
"""
__slots__ = ('_multi', '_validator', '_default_multi', '_default',
'_state_callback', '_callback', '_multitype',
'_warnings_only', '_master_slaves', '__weakref__')
_empty = ''
def __init__(self, name, doc, default=None, default_multi=None,
requires=None, multi=False, callback=None,
callback_params=None, validator=None, validator_params=None,
properties=None, warnings_only=False):
"""
:param name: the option's name
:param doc: the option's description
:param default: specifies the default value of the option,
for a multi : ['bla', 'bla', 'bla']
:param default_multi: 'bla' (used in case of a reset to default only at
a given index)
:param requires: is a list of names of options located anywhere
in the configuration.
:param multi: if true, the option's value is a list
:param callback: the name of a function. If set, the function's output
is responsible of the option's value
:param callback_params: the callback's parameter
:param validator: the name of a function which stands for a custom
validation of the value
:param validator_params: the validator's parameters
:param properties: tuple of default properties
:param warnings_only: _validator and _consistencies don't raise if True
Values()._warning contain message
"""
super(Option, self).__init__(name, doc, requires, properties)
self._multi = multi
if validator is not None:
validate_callback(validator, validator_params, 'validator')
self._validator = (validator, validator_params)
else:
self._validator = None
if not self._multi and default_multi is not None:
raise ValueError(_("a default_multi is set whereas multi is False"
" in option: {0}").format(name))
if default_multi is not None:
try:
self._validate(default_multi)
except ValueError as err:
raise ValueError(_("invalid default_multi value {0} "
"for option {1}: {2}").format(
str(default_multi), name, err))
if callback is not None and (default is not None or
default_multi is not None):
raise ValueError(_("default value not allowed if option: {0} "
"is calculated").format(name))
if callback is None and callback_params is not None:
raise ValueError(_("params defined for a callback function but "
"no callback defined"
" yet for option {0}").format(name))
if callback is not None:
validate_callback(callback, callback_params, 'callback')
self._callback = (callback, callback_params)
else:
self._callback = None
if self._multi:
if default is None:
default = []
self._multitype = multitypes.default
self._default_multi = default_multi
self._warnings_only = warnings_only
self.impl_validate(default)
self._default = default
def _launch_consistency(self, func, right_opt, right_val, context, index,
left_opts):
if context is not None:
descr = context.cfgimpl_get_description()
#right_opt is also in left_opts
if right_opt not in left_opts:
raise ConfigError(_('right_opt not in left_opts'))
left_vals = []
for opt in left_opts:
if right_opt == opt:
value = right_val
else:
if context is not None:
path = descr.impl_get_path_by_opt(opt)
value = context._getattr(path, validate=False)
else:
value = opt.impl_getdefault()
if index is None:
#could be multi or not
left_vals.append(value)
else:
#value is not already set, could be higher
try:
if right_opt == opt:
val = value
else:
val = value[index]
if val is None:
#no value so no consistencies
return
left_vals.append(val)
except IndexError:
#so return if no value
return
if self.impl_is_multi():
if index is None:
for idx, right_v in enumerate(right_val):
try:
left_v = []
for left_val in left_vals:
left_v.append(left_val[idx])
if None in left_v:
continue
except IndexError:
continue
getattr(self, func)(left_opts, left_v)
else:
if None in left_vals:
return
getattr(self, func)(left_opts, left_vals)
else:
if None in left_vals:
return
getattr(self, func)(left_opts, left_vals)
def impl_validate(self, value, context=None, validate=True,
force_no_multi=False):
"""
:param value: the option's value
:param context: Config's context
:type context: :class:`tiramisu.config.Config`
:param validate: if true enables ``self._validator`` validation
:type validate: boolean
:param force_no_multi: if multi, value has to be a list
not if force_no_multi is True
:type force_no_multi: boolean
"""
if not validate:
return
def val_validator(val):
if self._validator is not None:
if self._validator[1] is not None:
validator_params = deepcopy(self._validator[1])
if '' in validator_params:
lst = list(validator_params[''])
lst.insert(0, val)
validator_params[''] = tuple(lst)
else:
validator_params[''] = (val,)
else:
validator_params = {'': (val,)}
# Raise ValueError if not valid
carry_out_calculation(self._name, config=context,
callback=self._validator[0],
callback_params=validator_params)
def do_validation(_value, _index=None):
if _value is None:
return
# option validation
self._validate(_value)
try:
# valid with self._validator
val_validator(_value)
# if not context launch consistency validation
if context is not None:
descr._valid_consistency(self, _value, context, _index)
self._second_level_validation(_value)
except ValueError as err:
msg = _("invalid value {0} for option {1}: {2}").format(
_value, self._name, err)
if self._warnings_only:
warnings.warn_explicit(ValueWarning(msg, self),
ValueWarning,
self.__class__.__name__, 0)
else:
raise ValueError(msg)
# generic calculation
if context is not None:
descr = context.cfgimpl_get_description()
if not self._multi or force_no_multi:
do_validation(value)
else:
if not isinstance(value, list):
raise ValueError(_("invalid value {0} for option {1} "
"which must be a list").format(value,
self._name))
for index, val in enumerate(value):
do_validation(val, index)
def impl_getdefault(self, default_multi=False):
"accessing the default value"
if not default_multi or not self.impl_is_multi():
return self._default
else:
return self.getdefault_multi()
def impl_getdefault_multi(self):
"accessing the default value for a multi"
return self._default_multi
def impl_get_multitype(self):
return self._multitype
def impl_get_master_slaves(self):
return self._master_slaves
def impl_is_empty_by_default(self):
"no default value has been set yet"
if ((not self.impl_is_multi() and self._default is None) or
(self.impl_is_multi() and (self._default == []
or None in self._default))):
return True
return False
def impl_getdoc(self):
"accesses the Option's doc"
return self.impl_get_information('doc')
def impl_has_callback(self):
"to know if a callback has been defined or not"
if self._callback is None:
return False
else:
return True
def impl_getkey(self, value):
return value
def impl_is_multi(self):
return self._multi
def impl_add_consistency(self, func, *left_opts):
if self._consistencies is None:
self._consistencies = []
for opt in left_opts:
if not isinstance(opt, Option):
raise ValueError(_('consistency should be set with an option'))
if self is opt:
raise ValueError(_('cannot add consistency with itself'))
if self.impl_is_multi() != opt.impl_is_multi():
raise ValueError(_('options in consistency should be multi in '
'two sides'))
func = '_cons_{0}'.format(func)
opts = tuple([self] + list(left_opts))
self._launch_consistency(func, self, self.impl_getdefault(), None,
None, opts)
self._consistencies.append((func, opts))
self.impl_validate(self.impl_getdefault())
def _cons_not_equal(self, opts, vals):
if len(opts) != 2:
raise ConfigError(_('invalid len for opts'))
if vals[0] == vals[1]:
raise ValueError(_("invalid value {0} for option {1} "
"must be different as {2} option"
"").format(vals[0], self._name, opts[1]._name))
def _impl_convert_callbacks(self, descr, load=False):
if not load and self._callback is None:
self._state_callback = None
elif load and self._state_callback is None:
self._callback = None
del(self._state_callback)
else:
if load:
callback, callback_params = self._state_callback
else:
callback, callback_params = self._callback
if callback_params is not None:
cllbck_prms = {}
for key, values in callback_params.items():
vls = []
for value in values:
if isinstance(value, tuple):
if load:
value = (descr.impl_get_opt_by_path(value[0]),
value[1])
else:
value = (descr.impl_get_path_by_opt(value[0]),
value[1])
vls.append(value)
cllbck_prms[key] = tuple(vls)
else:
cllbck_prms = None
if load:
del(self._state_callback)
self._callback = (callback, cllbck_prms)
else:
self._state_callback = (callback, cllbck_prms)
def _second_level_validation(self, value):
pass
class ChoiceOption(Option):
"""represents a choice out of several objects.
The option can also have the value ``None``
"""
__slots__ = ('_values', '_open_values')
_opt_type = 'string'
def __init__(self, name, doc, values, default=None, default_multi=None,
requires=None, multi=False, callback=None,
callback_params=None, open_values=False, validator=None,
validator_params=None, properties=None, warnings_only=False):
"""
:param values: is a list of values the option can possibly take
"""
if not isinstance(values, tuple):
raise TypeError(_('values must be a tuple for {0}').format(name))
self._values = values
if open_values not in (True, False):
raise TypeError(_('open_values must be a boolean for '
'{0}').format(name))
self._open_values = open_values
super(ChoiceOption, self).__init__(name, doc, default=default,
default_multi=default_multi,
callback=callback,
callback_params=callback_params,
requires=requires,
multi=multi,
validator=validator,
validator_params=validator_params,
properties=properties,
warnings_only=warnings_only)
def impl_get_values(self):
return self._values
def impl_is_openvalues(self):
return self._open_values
def _validate(self, value):
if not self._open_values and not value in self._values:
raise ValueError(_('value {0} is not permitted, '
'only {1} is allowed'
'').format(value, self._values))
class BoolOption(Option):
"represents a choice between ``True`` and ``False``"
__slots__ = tuple()
_opt_type = 'bool'
def _validate(self, value):
if not isinstance(value, bool):
raise ValueError(_('value must be a boolean'))
class IntOption(Option):
"represents a choice of an integer"
__slots__ = tuple()
_opt_type = 'int'
def _validate(self, value):
if not isinstance(value, int):
raise ValueError(_('value must be an integer'))
class FloatOption(Option):
"represents a choice of a floating point number"
__slots__ = tuple()
_opt_type = 'float'
def _validate(self, value):
if not isinstance(value, float):
raise ValueError(_('value must be a float'))
class StrOption(Option):
"represents the choice of a string"
__slots__ = tuple()
_opt_type = 'string'
def _validate(self, value):
if not isinstance(value, str):
raise ValueError(_('value must be a string, not '
'{0}').format(type(value)))
if sys.version_info[0] >= 3:
#UnicodeOption is same has StrOption in python 3+
class UnicodeOption(StrOption):
__slots__ = tuple()
pass
else:
class UnicodeOption(Option):
"represents the choice of a unicode string"
__slots__ = tuple()
_opt_type = 'unicode'
_empty = u''
def _validate(self, value):
if not isinstance(value, unicode):
raise ValueError(_('value must be an unicode'))
class SymLinkOption(BaseOption):
__slots__ = ('_name', '_opt', '_state_opt')
_opt_type = 'symlink'
#not return _opt consistencies
_consistencies = {}
def __init__(self, name, opt):
self._name = name
if not isinstance(opt, Option):
raise ValueError(_('malformed symlinkoption '
'must be an option '
'for symlink {0}').format(name))
self._opt = opt
self._readonly = True
def __getattr__(self, name):
if name in ('_name', '_opt', '_opt_type', '_readonly'):
return object.__getattr__(self, name)
else:
return getattr(self._opt, name)
def _impl_getstate(self, descr):
super(SymLinkOption, self)._impl_getstate(descr)
self._state_opt = descr.impl_get_path_by_opt(self._opt)
def _impl_setstate(self, descr):
self._opt = descr.impl_get_opt_by_path(self._state_opt)
del(self._state_opt)
super(SymLinkOption, self)._impl_setstate(descr)
def _impl_convert_consistencies(self, descr, load=False):
if load:
del(self._state_consistencies)
else:
self._state_consistencies = None
class IPOption(Option):
"represents the choice of an ip"
__slots__ = ('_private_only', '_allow_reserved')
_opt_type = 'ip'
def __init__(self, name, doc, default=None, default_multi=None,
requires=None, multi=False, callback=None,
callback_params=None, validator=None, validator_params=None,
properties=None, private_only=False, allow_reserved=False,
warnings_only=False):
self._private_only = private_only
self._allow_reserved = allow_reserved
super(IPOption, self).__init__(name, doc, default=default,
default_multi=default_multi,
callback=callback,
callback_params=callback_params,
requires=requires,
multi=multi,
validator=validator,
validator_params=validator_params,
properties=properties,
warnings_only=warnings_only)
def _validate(self, value):
try:
IP('{0}/32'.format(value))
except ValueError:
raise ValueError(_('invalid IP {0}').format(self._name))
def _second_level_validation(self, value):
ip = IP('{0}/32'.format(value))
if not self._allow_reserved and ip.iptype() == 'RESERVED':
raise ValueError(_("IP mustn't not be in reserved class"))
if self._private_only and not ip.iptype() == 'PRIVATE':
raise ValueError(_("IP must be in private class"))
class PortOption(Option):
"""represents the choice of a port
The port numbers are divided into three ranges:
the well-known ports,
the registered ports,
and the dynamic or private ports.
You can actived this three range.
Port number 0 is reserved and can't be used.
see: http://en.wikipedia.org/wiki/Port_numbers
"""
__slots__ = ('_allow_range', '_allow_zero', '_min_value', '_max_value')
_opt_type = 'port'
def __init__(self, name, doc, default=None, default_multi=None,
requires=None, multi=False, callback=None,
callback_params=None, validator=None, validator_params=None,
properties=None, allow_range=False, allow_zero=False,
allow_wellknown=True, allow_registred=True,
allow_private=False, warnings_only=False):
self._allow_range = allow_range
self._min_value = None
self._max_value = None
ports_min = [0, 1, 1024, 49152]
ports_max = [0, 1023, 49151, 65535]
is_finally = False
for index, allowed in enumerate([allow_zero,
allow_wellknown,
allow_registred,
allow_private]):
if self._min_value is None:
if allowed:
self._min_value = ports_min[index]
elif not allowed:
is_finally = True
elif allowed and is_finally:
raise ValueError(_('inconsistency in allowed range'))
if allowed:
self._max_value = ports_max[index]
if self._max_value is None:
raise ValueError(_('max value is empty'))
super(PortOption, self).__init__(name, doc, default=default,
default_multi=default_multi,
callback=callback,
callback_params=callback_params,
requires=requires,
multi=multi,
validator=validator,
validator_params=validator_params,
properties=properties,
warnings_only=warnings_only)
def _validate(self, value):
if self._allow_range and ":" in str(value):
value = str(value).split(':')
if len(value) != 2:
raise ValueError('range must have two values only')
if not value[0] < value[1]:
raise ValueError('first port in range must be'
' smaller than the second one')
else:
value = [value]
for val in value:
if not self._min_value <= int(val) <= self._max_value:
raise ValueError('port must be an between {0} and {1}'
''.format(self._min_value, self._max_value))
class NetworkOption(Option):
"represents the choice of a network"
__slots__ = tuple()
_opt_type = 'network'
def _validate(self, value):
try:
IP(value)
except ValueError:
raise ValueError(_('invalid network address {0}').format(self._name))
def _second_level_validation(self, value):
ip = IP(value)
if ip.iptype() == 'RESERVED':
raise ValueError(_("network shall not be in reserved class"))
class NetmaskOption(Option):
"represents the choice of a netmask"
__slots__ = tuple()
_opt_type = 'netmask'
def _validate(self, value):
try:
IP('0.0.0.0/{0}'.format(value))
except ValueError:
raise ValueError(_('invalid netmask address {0}').format(self._name))
def _cons_network_netmask(self, opts, vals):
#opts must be (netmask, network) options
self.__cons_netmask(opts, vals[0], vals[1], False)
def _cons_ip_netmask(self, opts, vals):
#opts must be (netmask, ip) options
self.__cons_netmask(opts, vals[0], vals[1], True)
def __cons_netmask(self, opts, val_netmask, val_ipnetwork, make_net):
if len(opts) != 2:
raise ConfigError(_('invalid len for opts'))
msg = None
try:
ip = IP('{0}/{1}'.format(val_ipnetwork, val_netmask),
make_net=make_net)
#if cidr == 32, ip same has network
if ip.prefixlen() != 32:
try:
IP('{0}/{1}'.format(val_ipnetwork, val_netmask),
make_net=not make_net)
except ValueError:
if not make_net:
msg = _("invalid network {0} ({1}) "
"with netmask {2} ({3}),"
" this network is an IP")
else:
if make_net:
msg = _("invalid IP {0} ({1}) with netmask {2} ({3}),"
" this IP is a network")
except ValueError:
if make_net:
msg = _("invalid IP {0} ({1}) with netmask {2} ({3})")
else:
msg = _("invalid network {0} ({1}) with netmask {2} ({3})")
if msg is not None:
raise ValueError(msg.format(val_ipnetwork, opts[1]._name,
val_netmask, self._name))
class BroadcastOption(Option):
__slots__ = tuple()
_opt_type = 'broadcast'
def _validate(self, value):
try:
IP('{0}/32'.format(value))
except ValueError:
raise ValueError(_('invalid broadcast address {0}').format(self._name))
def _cons_broadcast(self, opts, vals):
if len(vals) != 3:
raise ConfigError(_('invalid len for vals'))
broadcast, network, netmask = vals
if IP('{0}/{1}'.format(network, netmask)).broadcast() != IP(broadcast):
raise ValueError(_('invalid broadcast {0} ({1}) with network {2} '
'({3}) and netmask {4} ({5})').format(
broadcast, opts[0]._name, network,
opts[1]._name, netmask, opts[2]._name))
class DomainnameOption(Option):
"represents the choice of a domain name"
__slots__ = ('_type', '_allow_ip')
_opt_type = 'domainname'
def __init__(self, name, doc, default=None, default_multi=None,
requires=None, multi=False, callback=None,
callback_params=None, validator=None, validator_params=None,
properties=None, allow_ip=False, type_='domainname',
warnings_only=False):
#netbios: for MS domain
#hostname: to identify the device
#domainname:
#fqdn: with tld, not supported yet
if type_ not in ['netbios', 'hostname', 'domainname']:
raise ValueError(_('unknown type_ {0} for hostname').format(type_))
self._type = type_
if allow_ip not in [True, False]:
raise ValueError(_('allow_ip must be a boolean'))
self._allow_ip = allow_ip
super(DomainnameOption, self).__init__(name, doc, default=default,
default_multi=default_multi,
callback=callback,
callback_params=callback_params,
requires=requires,
multi=multi,
validator=validator,
validator_params=validator_params,
properties=properties,
warnings_only=warnings_only)
def _validate(self, value):
if self._allow_ip is True:
try:
IP('{0}/32'.format(value))
return
except ValueError:
pass
if self._type == 'netbios':
length = 15
extrachar = ''
elif self._type == 'hostname':
length = 63
extrachar = ''
elif self._type == 'domainname':
length = 255
extrachar = '\.'
if '.' not in value:
raise ValueError(_("invalid value for {0}, must have dot"
"").format(self._name))
if len(value) > length:
raise ValueError(_("invalid domainname's length for"
" {0} (max {1})").format(self._name, length))
if len(value) == 1:
raise ValueError(_("invalid domainname's length for {0} (min 2)"
"").format(self._name))
regexp = r'^[a-z]([a-z\d{0}-])*[a-z\d]$'.format(extrachar)
if re.match(regexp, value) is None:
raise ValueError(_('invalid domainname'))
class OptionDescription(BaseOption):
"""Config's schema (organisation, group) and container of Options
The `OptionsDescription` objects lives in the `tiramisu.config.Config`.
"""
__slots__ = ('_name', '_requires', '_cache_paths', '_group_type',
'_state_group_type', '_properties', '_children',
'_consistencies', '_calc_properties', '__weakref__',
'_readonly', '_impl_informations', '_state_requires',
'_state_consistencies', '_stated', '_state_readonly')
_opt_type = 'optiondescription'
def __init__(self, name, doc, children, requires=None, properties=None):
"""
:param children: a list of options (including optiondescriptions)
"""
super(OptionDescription, self).__init__(name, doc, requires, properties)
child_names = [child._name for child in children]
#better performance like this
valid_child = copy(child_names)
valid_child.sort()
old = None
for child in valid_child:
if child == old:
raise ConflictError(_('duplicate option name: '
'{0}').format(child))
old = child
self._children = (tuple(child_names), tuple(children))
self._cache_paths = None
# the group_type is useful for filtering OptionDescriptions in a config
self._group_type = groups.default
def impl_getdoc(self):
return self.impl_get_information('doc')
def __getattr__(self, name):
if name in self.__slots__:
return object.__getattribute__(self, name)
try:
return self._children[1][self._children[0].index(name)]
except ValueError:
raise AttributeError(_('unknown Option {0} '
'in OptionDescription {1}'
'').format(name, self._name))
def impl_getkey(self, config):
return tuple([child.impl_getkey(getattr(config, child._name))
for child in self.impl_getchildren()])
def impl_getpaths(self, include_groups=False, _currpath=None):
"""returns a list of all paths in self, recursively
_currpath should not be provided (helps with recursion)
"""
if _currpath is None:
_currpath = []
paths = []
for option in self.impl_getchildren():
attr = option._name
if isinstance(option, OptionDescription):
if include_groups:
paths.append('.'.join(_currpath + [attr]))
paths += option.impl_getpaths(include_groups=include_groups,
_currpath=_currpath + [attr])
else:
paths.append('.'.join(_currpath + [attr]))
return paths
def impl_getchildren(self):
return self._children[1]
def impl_build_cache(self,
cache_path=None,
cache_option=None,
_currpath=None,
_consistencies=None,
force_no_consistencies=False):
if _currpath is None and self._cache_paths is not None:
# cache already set
return
if _currpath is None:
save = True
_currpath = []
if not force_no_consistencies:
_consistencies = {}
else:
save = False
if cache_path is None:
cache_path = []
cache_option = []
for option in self.impl_getchildren():
attr = option._name
if option in cache_option:
raise ConflictError(_('duplicate option: {0}').format(option))
cache_option.append(option)
if not force_no_consistencies:
option._readonly = True
cache_path.append(str('.'.join(_currpath + [attr])))
if not isinstance(option, OptionDescription):
if not force_no_consistencies and \
option._consistencies is not None:
for consistency in option._consistencies:
func, left_opts = consistency
for opt in left_opts:
_consistencies.setdefault(opt,
[]).append((func,
left_opts))
else:
_currpath.append(attr)
option.impl_build_cache(cache_path,
cache_option,
_currpath,
_consistencies,
force_no_consistencies)
_currpath.pop()
if save:
self._cache_paths = (tuple(cache_option), tuple(cache_path))
if not force_no_consistencies:
self._consistencies = _consistencies
self._readonly = True
def impl_get_opt_by_path(self, path):
try:
return self._cache_paths[0][self._cache_paths[1].index(path)]
except ValueError:
raise AttributeError(_('no option for path {0}').format(path))
def impl_get_path_by_opt(self, opt):
try:
return self._cache_paths[1][self._cache_paths[0].index(opt)]
except ValueError:
raise AttributeError(_('no option {0} found').format(opt))
# ____________________________________________________________
def impl_set_group_type(self, group_type):
"""sets a given group object to an OptionDescription
:param group_type: an instance of `GroupType` or `MasterGroupType`
that lives in `setting.groups`
"""
if self._group_type != groups.default:
raise TypeError(_('cannot change group_type if already set '
'(old {0}, new {1})').format(self._group_type,
group_type))
if isinstance(group_type, groups.GroupType):
self._group_type = group_type
if isinstance(group_type, groups.MasterGroupType):
#if master (same name has group) is set
identical_master_child_name = False
#for collect all slaves
slaves = []
master = None
for child in self.impl_getchildren():
if isinstance(child, OptionDescription):
raise ValueError(_("master group {0} shall not have "
"a subgroup").format(self._name))
if isinstance(child, SymLinkOption):
raise ValueError(_("master group {0} shall not have "
"a symlinkoption").format(self._name))
if not child.impl_is_multi():
raise ValueError(_("not allowed option {0} "
"in group {1}"
": this option is not a multi"
"").format(child._name, self._name))
if child._name == self._name:
identical_master_child_name = True
child._multitype = multitypes.master
master = child
else:
slaves.append(child)
if master is None:
raise ValueError(_('master group with wrong'
' master name for {0}'
).format(self._name))
master._master_slaves = tuple(slaves)
for child in self.impl_getchildren():
if child != master:
child._master_slaves = master
child._multitype = multitypes.slave
if not identical_master_child_name:
raise ValueError(_("no child has same nom has master group"
" for: {0}").format(self._name))
else:
raise ValueError(_('group_type: {0}'
' not allowed').format(group_type))
def impl_get_group_type(self):
return self._group_type
def _valid_consistency(self, right_opt, right_val, context=None, index=None):
#[('_cons_not_equal', (opt1, opt2))]
consistencies = self._consistencies.get(right_opt)
if consistencies is not None:
for func, opts in consistencies:
#opts[0] is the option where func is set
#opts is left_opts
ret = opts[0]._launch_consistency(func, right_opt, right_val,
context, index, opts)
if ret is False:
return False
return True
def _impl_getstate(self, descr=None):
"""enables us to export into a dict
:param descr: parent :class:`tiramisu.option.OptionDescription`
"""
if descr is None:
self.impl_build_cache()
descr = self
super(OptionDescription, self)._impl_getstate(descr)
self._state_group_type = str(self._group_type)
for option in self.impl_getchildren():
option._impl_getstate(descr)
def __getstate__(self):
"""special method to enable the serialization with pickle
"""
stated = True
try:
# the `_state` attribute is a flag that which tells us if
# the serialization can be performed
self._stated
except AttributeError:
# if cannot delete, _impl_getstate never launch
# launch it recursivement
# _stated prevent __getstate__ launch more than one time
# _stated is delete, if re-serialize, re-lauch _impl_getstate
self._impl_getstate()
stated = False
return super(OptionDescription, self).__getstate__(stated)
def _impl_setstate(self, descr=None):
"""enables us to import from a dict
:param descr: parent :class:`tiramisu.option.OptionDescription`
"""
if descr is None:
self._cache_paths = None
self.impl_build_cache(force_no_consistencies=True)
descr = self
self._group_type = getattr(groups, self._state_group_type)
del(self._state_group_type)
super(OptionDescription, self)._impl_setstate(descr)
for option in self.impl_getchildren():
option._impl_setstate(descr)
def __setstate__(self, state):
super(OptionDescription, self).__setstate__(state)
try:
self._stated
except AttributeError:
self._impl_setstate()
def validate_requires_arg(requires, name):
"""check malformed requirements
and tranform dict to internal tuple
:param requires: have a look at the
:meth:`tiramisu.setting.Settings.apply_requires` method to
know more about
the description of the requires dictionary
"""
if requires is None:
return None, None
ret_requires = {}
config_action = {}
# start parsing all requires given by user (has dict)
# transforme it to a tuple
for require in requires:
if not type(require) == dict:
raise ValueError(_("malformed requirements type for option:"
" {0}, must be a dict").format(name))
valid_keys = ('option', 'expected', 'action', 'inverse', 'transitive',
'same_action')
unknown_keys = frozenset(require.keys()) - frozenset(valid_keys)
if unknown_keys != frozenset():
raise ValueError('malformed requirements for option: {0}'
' unknown keys {1}, must only '
'{2}'.format(name,
unknown_keys,
valid_keys))
# prepare all attributes
try:
option = require['option']
expected = require['expected']
action = require['action']
except KeyError:
raise ValueError(_("malformed requirements for option: {0}"
" require must have option, expected and"
" action keys").format(name))
inverse = require.get('inverse', False)
if inverse not in [True, False]:
raise ValueError(_('malformed requirements for option: {0}'
' inverse must be boolean'))
transitive = require.get('transitive', True)
if transitive not in [True, False]:
raise ValueError(_('malformed requirements for option: {0}'
' transitive must be boolean'))
same_action = require.get('same_action', True)
if same_action not in [True, False]:
raise ValueError(_('malformed requirements for option: {0}'
' same_action must be boolean'))
if not isinstance(option, Option):
raise ValueError(_('malformed requirements '
'must be an option in option {0}').format(name))
if option.impl_is_multi():
raise ValueError(_('malformed requirements option {0} '
'should not be a multi').format(name))
if expected is not None:
try:
option._validate(expected)
except ValueError as err:
raise ValueError(_('malformed requirements second argument '
'must be valid for option {0}'
': {1}').format(name, err))
if action in config_action:
if inverse != config_action[action]:
raise ValueError(_("inconsistency in action types"
" for option: {0}"
" action: {1}").format(name, action))
else:
config_action[action] = inverse
if action not in ret_requires:
ret_requires[action] = {}
if option not in ret_requires[action]:
ret_requires[action][option] = (option, [expected], action,
inverse, transitive, same_action)
else:
ret_requires[action][option][1].append(expected)
# transform dict to tuple
ret = []
for opt_requires in ret_requires.values():
ret_action = []
for require in opt_requires.values():
ret_action.append((require[0], tuple(require[1]), require[2],
require[3], require[4], require[5]))
ret.append(tuple(ret_action))
return frozenset(config_action.keys()), tuple(ret)
def validate_callback(callback, callback_params, type_):
if type(callback) != FunctionType:
raise ValueError(_('{0} should be a function').format(type_))
if callback_params is not None:
if not isinstance(callback_params, dict):
raise ValueError(_('{0}_params should be a dict').format(type_))
for key, callbacks in callback_params.items():
if key != '' and len(callbacks) != 1:
raise ValueError(_('{0}_params with key {1} should not have '
'length different to 1').format(type_,
key))
if not isinstance(callbacks, tuple):
raise ValueError(_('{0}_params should be tuple for key "{1}"'
).format(type_, key))
for callbk in callbacks:
if isinstance(callbk, tuple):
option, force_permissive = callbk
if type_ == 'validator' and not force_permissive:
raise ValueError(_('validator not support tuple'))
if not isinstance(option, Option) and not \
isinstance(option, SymLinkOption):
raise ValueError(_('{0}_params should have an option '
'not a {0} for first argument'
).format(type_, type(option)))
if force_permissive not in [True, False]:
raise ValueError(_('{0}_params should have a boolean'
' not a {0} for second argument'
).format(type_, type(
force_permissive)))