tiramisu/tiramisu/option.py

671 lines
27 KiB
Python
Raw Blame History

This file contains invisible Unicode characters

This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# -*- coding: utf-8 -*-
"option types and option description for the configuration management"
# Copyright (C) 2012 Team tiramisu (see AUTHORS for all contributors)
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
#
# The original `Config` design model is unproudly borrowed from
# the rough pypy's guys: http://codespeak.net/svn/pypy/dist/pypy/config/
# the whole pypy projet is under MIT licence
# ____________________________________________________________
from types import FunctionType
from tiramisu.basetype import HiddenBaseType, DisabledBaseType
from tiramisu.error import (ConfigError, ConflictConfigError, NotFoundError,
RequiresError, RequirementRecursionError, MandatoryError,
PropertiesOptionError)
from tiramisu.autolib import carry_out_calculation
from tiramisu.setting import settings, groups, owners
requires_actions = [('hide', 'show'), ('enable', 'disable'), ('freeze', 'unfreeze')]
available_actions = []
reverse_actions = {}
for act1, act2 in requires_actions:
available_actions.extend([act1, act2])
reverse_actions[act1] = act2
reverse_actions[act2] = act1
# ____________________________________________________________
# multi types
class Multi(list):
"""multi options values container
that support item notation for the values of multi options"""
def __init__(self, lst, config, opt, force_append=True):
"""
:param lst: the Multi wraps a list value
:param config: the parent config
:param opt: the option object that have this Multi value
:param force_append: - True to append child value with master's one
- False to force lst value
"""
self.config = config
self.opt = opt
if force_append and self.opt.is_master(config):
# we pass the list at the list type's init
# because a normal init cannot return anything
super(Multi, self).__init__(lst)
# we add the slaves without modifying the master
for l in lst:
self.append(l, add_master=False)
else:
if force_append:
self.config._valid_len(self.opt._name, lst)
super(Multi, self).__init__(lst)
def __setitem__(self, key, value):
self._setvalue(value, key, who=settings.get_owner())
def append(self, value, add_master=True):
"""the list value can be updated (appened)
only if the option is a master
:param add_master: adds slaves without modifiying the master option
if True, adds slaves **and** the master option
"""
try:
master = self.config._cfgimpl_descr.get_master_name()
if master != self.opt._name:
raise IndexError("in a group with a master, you mustn't add "
"a value in a slave's Multi value")
except TypeError:
# Not a master/slaves
self._setvalue(value, who=settings.get_owner())
return
multis = []
for opt in self.config._cfgimpl_descr._children:
if isinstance(opt, OptionDescription):
continue
multi = self.config._cfgimpl_values[opt._name]
if master == multi.opt._name:
if add_master:
multi._setvalue(value, who=settings.get_owner())
elif len(multi) == 0 \
or len(multi) < len(self):
multi._append_default()
def _append_default(self):
default_value = self.opt.getdefault_multi()
self._setvalue(default_value)
def _setvalue(self, value, key=None, who=None):
if value != None:
if not self.opt._validate(value):
raise ConfigError("invalid value {0} "
"for option {1}".format(str(value), self.opt._name))
oldvalue = list(self)
if key is None:
super(Multi, self).append(value)
else:
super(Multi, self).__setitem__(key, value)
if who != None:
if not isinstance(who, owners.Owner):
raise TypeError("invalid owner {0} for the value {1}".format(
str(who), str(value)))
self.opt.setowner(self.config, getattr(owners, who))
self.config._cfgimpl_previous_values[self.opt._name] = oldvalue
def pop(self, key):
"""the list value can be updated (poped)
only if the option is a master
:param key: index of the element to pop
:return: the requested element
"""
try:
master = self.config._cfgimpl_descr.get_master_name()
if master != self.opt._name:
raise IndexError("in a group with a master, you mustn't remove "
"a value in a slave's Multi value")
except TypeError:
return self._pop(key)
multis = []
for name, multi in self.config:
multis.append(multi)
for multi in multis:
if master == multi.opt._name:
ret = multi._pop(key)
else:
change_who = False
# the value owner has to be updated because
# the default value doesn't have the same length
# of the new value
if len(multi.opt.getdefault()) >= len(multi):
change_who = True
multi._pop(key, change_who=change_who)
if ret not in locals():
raise ConfigError('Unexpected multi pop error: ret must be defined')
return ret
def _pop(self, key, change_who=True):
if change_who:
self.opt.setowner(self.config, settings.get_owner())
self.config._cfgimpl_previous_values[self.opt._name] = list(self)
return super(Multi, self).pop(key)
# ____________________________________________________________
#
class Option(HiddenBaseType, DisabledBaseType):
"""
Abstract base class for configuration option's.
Reminder: an Option object is **not** a container for the value
"""
#freeze means: cannot modify the value of an Option once set
_frozen = False
#if an Option has been frozen, shall return the default value
_force_default_on_freeze = False
def __init__(self, name, doc, default=None, default_multi=None,
requires=None, mandatory=False, multi=False, callback=None,
callback_params=None, validator=None, validator_args={}):
"""
:param name: the option's name
:param doc: the option's description
:param default: specifies the default value of the option,
for a multi : ['bla', 'bla', 'bla']
:param default_multi: 'bla' (used in case of a reset to default only at
a given index)
:param requires: is a list of names of options located anywhere
in the configuration.
:param multi: if true, the option's value is a list
:param callback: the name of a function. If set, the function's output
is responsible of the option's value
:param callback_params: the callback's parameter
:param validator: the name of a function wich stands for a custom
validation of the value
:param validator_args: the validator's parameters
"""
self._name = name
self.doc = doc
self._requires = requires
self._mandatory = mandatory
self.multi = multi
self._validator = None
self._validator_args = None
if validator is not None:
if type(validator) != FunctionType:
raise TypeError("validator must be a function")
self._validator = validator
if validator_args is not None:
self._validator_args = validator_args
if not self.multi and default_multi is not None:
raise ConfigError("a default_multi is set whereas multi is False"
" in option: {0}".format(name))
if default_multi is not None and not self._validate(default_multi):
raise ConfigError("invalid default_multi value {0} "
"for option {1}".format(str(default_multi), name))
self.default_multi = default_multi
#if self.multi and default_multi is None:
# _cfgimpl_warnings[name] = DefaultMultiWarning
if callback is not None and (default is not None or default_multi is not None):
raise ConfigError("defaut values not allowed if option: {0} "
"is calculated".format(name))
self.callback = callback
if self.callback is None and callback_params is not None:
raise ConfigError("params defined for a callback function but"
" no callback defined yet for option {0}".format(name))
self.callback_params = callback_params
if self.multi == True:
if default == None:
default = []
if not isinstance(default, list):
raise ConfigError("invalid default value {0} "
"for option {1} : not list type".format(str(default), name))
if not self.validate(default, False):
raise ConfigError("invalid default value {0} "
"for option {1}".format(str(default), name))
else:
if default != None and not self.validate(default, False):
raise ConfigError("invalid default value {0} "
"for option {1}".format(str(default), name))
self.default = default
self.properties = [] # 'hidden', 'disabled'...
def validate(self, value, validate=True):
"""
:param value: the option's value
:param validate: if true enables ``self._validator`` validation
"""
# generic calculation
if self.multi == False:
# None allows the reset of the value
if value != None:
# customizing the validator
if validate and self._validator is not None and \
not self._validator(value, **self._validator_args):
return False
return self._validate(value)
else:
if not isinstance(value, list):
raise ConfigError("invalid value {0} "
"for option {1} which must be a list".format(value,
self._name))
for val in value:
# None allows the reset of the value
if val != None:
# customizing the validator
if validate and self._validator is not None and \
not self._validator(val, **self._validator_args):
return False
if not self._validate(val):
return False
return True
def getdefault(self, default_multi=False):
"accessing the default value"
if default_multi == False or not self.is_multi():
return self.default
else:
return self.getdefault_multi()
def getdefault_multi(self):
"accessing the default value for a multi"
return self.default_multi
def is_empty_by_default(self):
"no default value has been set yet"
if ((not self.is_multi() and self.default == None) or
(self.is_multi() and (self.default == [] or None in self.default))):
return True
return False
def force_default(self):
"if an Option has been frozen, shall return the default value"
self._force_default_on_freeze = True
def hascallback_and_isfrozen():
return self._frozen and self.has_callback()
def is_forced_on_freeze(self):
"if an Option has been frozen, shall return the default value"
return self._frozen and self._force_default_on_freeze
def getdoc(self):
"accesses the Option's doc"
return self.doc
def getcallback(self):
"a callback is only a link, the name of an external hook"
return self.callback
def has_callback(self):
"to know if a callback has been defined or not"
if self.callback == None:
return False
else:
return True
def getcallback_value(self, config):
return carry_out_calculation(self._name,
option=self, config=config)
def getcallback_params(self):
"if a callback has been defined, returns his arity"
return self.callback_params
def setowner(self, config, owner):
"""
:param config: *must* be only the **parent** config
(not the toplevel config)
:param owner: is a **real** owner, that is an object
that lives in setting.owners
"""
name = self._name
if not isinstance(owner, owners.Owner):
raise ConfigError("invalid type owner for option: {0}".format(
str(name)))
config._cfgimpl_value_owners[name] = owner
def getowner(self, config):
"config *must* be only the **parent** config (not the toplevel config)"
return config._cfgimpl_value_owners[self._name]
def reset(self, config):
"""resets the default value and owner
"""
config.setoption(self._name, self.getdefault(), owners.default)
def is_default_owner(self, config):
"""
:param config: *must* be only the **parent** config
(not the toplevel config)
:return: boolean
"""
return self.getowner(config) == owners.default
def setoption(self, config, value):
"""changes the option's value with the value_owner's who
:param config: the parent config is necessary here to store the value
"""
name = self._name
rootconfig = config._cfgimpl_get_toplevel()
if not self.validate(value, settings.validator):
raise ConfigError('invalid value %s for option %s' % (value, name))
if self.is_mandatory():
# value shall not be '' for a mandatory option
# so '' is considered as being None
if not self.is_multi() and value == '':
value = None
if self.is_multi() and '' in value:
value = Multi([{'': None}.get(i, i) for i in value], config, self)
if settings.is_mandatory() and ((self.is_multi() and value == []) or \
(not self.is_multi() and value is None)):
raise MandatoryError('cannot change the value to %s for '
'option %s' % (value, name))
if name not in config._cfgimpl_values:
raise AttributeError('unknown option %s' % (name))
if settings.is_frozen() and self.is_frozen():
raise TypeError('cannot change the value to %s for '
'option %s this option is frozen' % (str(value), name))
apply_requires(self, config)
if type(config._cfgimpl_values[name]) == Multi:
config._cfgimpl_previous_values[name] = list(config._cfgimpl_values[name])
else:
config._cfgimpl_previous_values[name] = config._cfgimpl_values[name]
config._cfgimpl_values[name] = value
def getkey(self, value):
return value
def is_master(self, config):
try:
self.master = config._cfgimpl_descr.get_master_name()
except TypeError:
return False
return self.master is not None and self.master == self._name
# ____________________________________________________________
"freeze utility"
def freeze(self):
self._frozen = True
return True
def unfreeze(self):
self._frozen = False
def is_frozen(self):
return self._frozen
# ____________________________________________________________
def is_multi(self):
return self.multi
def is_mandatory(self):
return self._mandatory
class ChoiceOption(Option):
opt_type = 'string'
def __init__(self, name, doc, values, default=None, default_multi=None,
requires=None, mandatory=False, multi=False, callback=None,
callback_params=None, open_values=False, validator=None,
validator_args={}):
self.values = values
if open_values not in [True, False]:
raise ConfigError('Open_values must be a boolean for '
'{0}'.format(name))
self.open_values = open_values
super(ChoiceOption, self).__init__(name, doc, default=default,
default_multi=default_multi, callback=callback,
callback_params=callback_params, requires=requires,
multi=multi, mandatory=mandatory, validator=validator,
validator_args=validator_args)
def _validate(self, value):
if not self.open_values:
return value is None or value in self.values
else:
return True
class BoolOption(Option):
opt_type = 'bool'
def _validate(self, value):
return isinstance(value, bool)
class IntOption(Option):
opt_type = 'int'
def _validate(self, value):
return isinstance(value, int)
class FloatOption(Option):
opt_type = 'float'
def _validate(self, value):
return isinstance(value, float)
class StrOption(Option):
opt_type = 'string'
def _validate(self, value):
return isinstance(value, str)
class SymLinkOption(object):
opt_type = 'symlink'
def __init__(self, name, path, opt):
self._name = name
self.path = path
self.opt = opt
def setoption(self, config, value):
setattr(config, self.path, value)
def __getattr__(self, name):
if name in ('_name', 'path', 'opt', 'setoption'):
return self.__dict__[name]
else:
return getattr(self.opt, name)
class IPOption(Option):
opt_type = 'ip'
def _validate(self, value):
# by now the validation is nothing but a string, use IPy instead
return isinstance(value, str)
class NetmaskOption(Option):
opt_type = 'netmask'
def _validate(self, value):
# by now the validation is nothing but a string, use IPy instead
return isinstance(value, str)
class OptionDescription(HiddenBaseType, DisabledBaseType):
"""Config's schema (organisation, group) and container of Options"""
# the group_type is useful for filtering OptionDescriptions in a config
group_type = groups.default
def __init__(self, name, doc, children, requires=None):
"""
:param children: is a list of option descriptions (including
``OptionDescription`` instances for nested namespaces).
"""
self._name = name
self.doc = doc
self._children = children
self._requires = requires
self._build()
self.properties = [] # 'hidden', 'disabled'...
# if this group is a master group, master is set
# to the master option name. it's just a ref to a name
self.master = None
def getdoc(self):
return self.doc
def _build(self):
for child in self._children:
setattr(self, child._name, child)
def add_child(self, child):
"dynamically adds a configuration option"
#Nothing is static. Even the Mona Lisa is falling apart.
for ch in self._children:
if isinstance(ch, Option):
if child._name == ch._name:
raise ConflictConfigError("existing option : {0}".format(
child._name))
self._children.append(child)
setattr(self, child._name, child)
def update_child(self, child):
"modification of an existing option"
# XXX : corresponds to the `redefine`, is it usefull
pass
def getkey(self, config):
return tuple([child.getkey(getattr(config, child._name))
for child in self._children])
def getpaths(self, include_groups=False, currpath=None):
"""returns a list of all paths in self, recursively
currpath should not be provided (helps with recursion)
"""
if currpath is None:
currpath = []
paths = []
for option in self._children:
attr = option._name
if attr.startswith('_cfgimpl'):
continue
if isinstance(option, OptionDescription):
if include_groups:
paths.append('.'.join(currpath + [attr]))
currpath.append(attr)
paths += option.getpaths(include_groups=include_groups,
currpath=currpath)
currpath.pop()
else:
paths.append('.'.join(currpath + [attr]))
return paths
# ____________________________________________________________
def set_group_type(self, group_type, master=None):
"""sets a given group object to an OptionDescription
:param group_type: an instance of `GroupType` or `MasterGroupType`
that lives in `setting.groups`
"""
if isinstance(group_type, groups.GroupType):
self.group_type = group_type
if isinstance(group_type, groups.MasterGroupType):
if master is None:
raise ConfigError('this group type ({0}) needs a master '
'for OptionDescription {1}'.format(group_type,
self._name))
else:
if master is not None:
raise ConfigError("this group type ({0}) doesn't need a "
"master for OptionDescription {1}".format(
group_type, self._name))
self.master = master
else:
raise ConfigError('not allowed group_type : {0}'.format(group_type))
def get_group_type(self):
return self.group_type
def get_master_name(self):
if self.master is None:
raise TypeError('get_master_name() shall not be called in case of '
'non-master OptionDescription')
return self.master
# ____________________________________________________________
"actions API"
def hide(self):
super(OptionDescription, self).hide()
for child in self._children:
if isinstance(child, OptionDescription):
child.hide()
def show(self):
super(OptionDescription, self).show()
for child in self._children:
if isinstance(child, OptionDescription):
child.show()
def disable(self):
super(OptionDescription, self).disable()
for child in self._children:
if isinstance(child, OptionDescription):
child.disable()
def enable(self):
super(OptionDescription, self).enable()
for child in self._children:
if isinstance(child, OptionDescription):
child.enable()
# ____________________________________________________________
def validate_requires_arg(requires, name):
"malformed requirements"
config_action = []
for req in requires:
if not type(req) == tuple and len(req) != 3:
raise RequiresError("malformed requirements for option:"
" {0}".format(name))
action = req[2]
if action not in available_actions:
raise RequiresError("malformed requirements for option: {0}"
"unknown action: {1}".format(name, action))
if reverse_actions[action] in config_action:
raise RequiresError("inconsistency in action types for option: {0}"
"action: {1} in contradiction with {2}\n"
" ({3})".format(name, action,
reverse_actions[action], requires))
config_action.append(action)
def build_actions(requires):
"action are hide, show, enable, disable..."
trigger_actions = {}
for require in requires:
action = require[2]
trigger_actions.setdefault(action, []).append(require)
return trigger_actions
def apply_requires(opt, config, permissive=False):
"carries out the jit (just in time requirements between options"
if hasattr(opt, '_requires') and opt._requires is not None:
rootconfig = config._cfgimpl_get_toplevel()
validate_requires_arg(opt._requires, opt._name)
# filters the callbacks
trigger_actions = build_actions(opt._requires)
for requires in trigger_actions.values():
matches = False
for require in requires:
name, expected, action = require
path = config._cfgimpl_get_path() + '.' + opt._name
if name.startswith(path):
raise RequirementRecursionError("malformed requirements "
"imbrication detected for option: '{0}' "
"with requirement on: '{1}'".format(path, name))
homeconfig, shortname = rootconfig._cfgimpl_get_home_by_path(name)
try:
value = homeconfig._getattr(shortname, permissive=True)
except PropertiesOptionError, err:
properties = err.proptype
if permissive:
for perm in settings.permissive:
if perm in properties:
properties.remove(perm)
if properties != []:
raise NotFoundError("option '{0}' has requirement's property error: "
"{1} {2}".format(opt._name, name, properties))
except Exception, err:
raise NotFoundError("required option not found: "
"{0}".format(name))
if value == expected:
getattr(opt, action)() #.hide() or show() or...
# FIXME generic programming opt.property_launch(action, False)
matches = True
# no callback has been triggered, then just reverse the action
if not matches:
getattr(opt, reverse_actions[action])()