# -*- coding: utf-8 -*- "option types and option description for the configuration management" # Copyright (C) 2012 Team tiramisu (see AUTHORS for all contributors) # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation; either version 2 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program; if not, write to the Free Software # Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA # # The original `Config` design model is unproudly borrowed from # the rough pypy's guys: http://codespeak.net/svn/pypy/dist/pypy/config/ # the whole pypy projet is under MIT licence # ____________________________________________________________ from tiramisu.basetype import HiddenBaseType, DisabledBaseType from tiramisu.error import (ConfigError, ConflictConfigError, NotFoundError, RequiresError, RequirementRecursionError, MandatoryError) requires_actions = [('hide', 'show'), ('enable', 'disable'), ('freeze', 'unfreeze')] available_actions = [] reverse_actions = {} for act1, act2 in requires_actions: available_actions.extend([act1, act2]) reverse_actions[act1] = act2 reverse_actions[act2] = act1 # ____________________________________________________________ # OptionDescription authorized group_type values group_types = ['default', 'family', 'group', 'master'] # ____________________________________________________________ # multi types class Multi(list): "container that support items for the values of list (multi) options" def __init__(self, lst, config, child): self.config = config self.child = child super(Multi, self).__init__(lst) def __setitem__(self, key, value): return self.setoption(value, key) def append(self, value): self.setoption(value) def setoption(self, value, key=None): owners = self.child.getowner(self.config) # None is replaced by default_multi if value == None: defval = self.child.getdefault() if key is not None and len(defval) > key: value = defval[key] else: value = self.child.default_multi who = 'default' else: who = self.config._cfgimpl_owner if not self.child._validate(value): raise ConfigError("invalid value {0} " "for option {1}".format(str(value), self.child._name)) oldvalue = list(self) oldowner = self.child.getowner(self.config) if key is None: ret = super(Multi, self).append(value) oldvalue.append(None) oldowner.append(who) else: ret = super(Multi, self).__setitem__(key, value) oldowner[key] = who self.config._cfgimpl_previous_values[self.child._name] = oldvalue self.child.setowner(self.config, oldowner) return ret def pop(self, key): oldowner = self.child.getowner(self.config) oldowner.pop(key) self.child.setowner(self.config, oldowner) super(Multi, self).pop(key) # ____________________________________________________________ # class Option(HiddenBaseType, DisabledBaseType): #reminder: an Option object is **not** a container for the value _frozen = False _force_default_on_freeze = False def __init__(self, name, doc, default=None, default_multi=None, requires=None, mandatory=False, multi=False, callback=None, callback_params=None): self._name = name self.doc = doc self._requires = requires self._mandatory = mandatory self.multi = multi if not self.multi and default_multi is not None: raise ConfigError("a default_multi is set whereas multi is False" " in option: {0}".format(name)) if default_multi is not None and not self._validate(default_multi): raise ConfigError("invalid default_multi value {0} " "for option {1}".format(str(default_multi), name)) self.default_multi = default_multi #if self.multi and default_multi is None: # _cfgimpl_warnings[name] = DefaultMultiWarning if callback is not None and (default is not None or default_multi is not None): raise ConfigError("defaut values not allowed if option: {0} " "is calculated".format(name)) self.callback = callback if self.callback is None and callback_params is not None: raise ConfigError("params defined for a callback function but" " no callback defined yet for option {0}".format(name)) self.callback_params = callback_params if self.multi == True: if default == None: default = [] if not isinstance(default, list) or not self.validate(default): raise ConfigError("invalid default value {0} " "for option {1} : not list type".format(str(default), name)) else: if default != None and not self.validate(default): raise ConfigError("invalid default value {0} " "for option {1}".format(str(default), name)) self.default = default self.properties = [] # 'hidden', 'disabled'... def validate(self, value): if self.multi == False: # None allows the reset of the value if value != None: return self._validate(value) else: if not isinstance(value, list): raise ConfigError("invalid value {0} " "for option {1} which must be a list".format(value, self._name)) for val in value: if val != None: # None allows the reset of the value if not self._validate(val): return False return True def getdefault(self): return self.default def is_empty_by_default(self): if ((not self.is_multi() and self.default == None) or (self.is_multi() and self.default == []) or None in self.default): return True return False def force_default(self): self._force_default_on_freeze = True def hascallback_and_isfrozen(): return self._frozen and self.has_callback() def is_forced_on_freeze(self): return self._frozen and self._force_default_on_freeze def getdoc(self): return self.doc def getcallback(self): return self.callback def has_callback(self): if self.callback == None: return False else: return True def getcallback_params(self): return self.callback_params def setowner(self, config, owner): # config *must* be only the **parent** config (not the toplevel config) # owner is a **real* owner, a list is actually allowable here name = self._name if self.is_multi(): if not type(owner) == list: raise ConfigError("invalid owner for multi " "option: {0}".format(name)) config._cfgimpl_value_owners[name] = owner def getowner(self, config): # config *must* be only the **parent** config (not the toplevel config) return config._cfgimpl_value_owners[self._name] def setoption(self, config, value, who): "who is **not necessarily** a owner because it cannot be a list" name = self._name if not self.validate(value): raise ConfigError('invalid value %s for option %s' % (value, name)) if self.is_mandatory(): # value shall not be '' for a mandatory option # so '' is considered as being None if not self.is_multi() and value == '': value = None if self.is_multi() and '' in value: value = Multi([{'': None}.get(i, i) for i in value], config, self) if config.is_mandatory() and ((self.is_multi() and value == []) or \ (not self.is_multi() and value is None)): raise MandatoryError('cannot change the value to %s for ' 'option %s' % (value, name)) if name not in config._cfgimpl_values: raise AttributeError('unknown option %s' % (name)) if config.is_frozen() and self.is_frozen(): raise TypeError('cannot change the value to %s for ' 'option %s' % (str(value), name)) if who == "default": # changes the default value (and therefore resets the previous value) if self._validate(value): self.default = value else: raise ConfigError("invalid value %s for option %s" % (value, name)) apply_requires(self, config) # FIXME put the validation for the multi somewhere else # # it is a multi **and** it has requires # if self.multi == True: # if type(value) != list: # raise TypeError("value {0} must be a list".format(value)) # if self._requires is not None: # for reqname in self._requires: # # FIXME : verify that the slaves are all multi # #option = getattr(config._cfgimpl_descr, reqname) # # if not option.multi == True: # # raise ConflictConfigError("an option with requires " # # "has to be a list type : {0}".format(name)) # if len(config._cfgimpl_values[reqname]) != len(value): # raise ConflictConfigError("an option with requires " # "has not the same length of the others " # "in the group : {0}".format(reqname)) if type(config._cfgimpl_values[name]) == Multi: config._cfgimpl_previous_values[name] = list(config._cfgimpl_values[name]) else: config._cfgimpl_previous_values[name] = config._cfgimpl_values[name] config._cfgimpl_values[name] = value def getkey(self, value): return value # ____________________________________________________________ def freeze(self): self._frozen = True return True def unfreeze(self): self._frozen = False def is_frozen(self): return self._frozen # ____________________________________________________________ def is_multi(self): return self.multi def is_mandatory(self): return self._mandatory class ChoiceOption(Option): opt_type = 'string' def __init__(self, name, doc, values, default=None, requires=None, callback=None, callback_params=None, multi=False, mandatory=False, open_values=False): self.values = values if open_values not in [True, False]: raise ConfigError('Open_values must be a boolean for ' '{0}'.format(name)) self.open_values = open_values super(ChoiceOption, self).__init__(name, doc, default=default, callback=callback, callback_params=callback_params, requires=requires, multi=multi, mandatory=mandatory) def _validate(self, value): if not self.open_values: return value is None or value in self.values else: return True class BoolOption(Option): opt_type = 'bool' def _validate(self, value): return isinstance(value, bool) # config level validator # def setoption(self, config, value, who): # name = self._name # if value and self._validator is not None: # toplevel = config._cfgimpl_get_toplevel() # self._validator(toplevel) # super(BoolOption, self).setoption(config, value, who) class IntOption(Option): opt_type = 'int' def _validate(self, value): return isinstance(value, int) class FloatOption(Option): opt_type = 'float' def _validate(self, value): return isinstance(value, float) class StrOption(Option): opt_type = 'string' def _validate(self, value): return isinstance(value, str) class SymLinkOption(object): opt_type = 'symlink' def __init__(self, name, path): self._name = name self.path = path def setoption(self, config, value, who): setattr(config, self.path, value) # .setoption(self.path, value, who) class IPOption(Option): opt_type = 'ip' def _validate(self, value): # by now the validation is nothing but a string, use IPy instead return isinstance(value, str) class NetmaskOption(Option): opt_type = 'netmask' def _validate(self, value): # by now the validation is nothing but a string, use IPy instead return isinstance(value, str) class ArbitraryOption(Option): def __init__(self, name, doc, default=None, defaultfactory=None, requires=None, multi=False, mandatory=False): super(ArbitraryOption, self).__init__(name, doc, requires=requires, multi=multi, mandatory=mandatory) self.defaultfactory = defaultfactory if defaultfactory is not None: assert default is None def _validate(self, value): return True def getdefault(self): if self.defaultfactory is not None: return self.defaultfactory() return self.default class OptionDescription(HiddenBaseType, DisabledBaseType): group_type = 'default' def __init__(self, name, doc, children, requires=None): self._name = name self.doc = doc self._children = children self._requires = requires self._build() self.properties = [] # 'hidden', 'disabled'... def getdoc(self): return self.doc def _build(self): for child in self._children: setattr(self, child._name, child) def add_child(self, child): "dynamically adds a configuration option" #Nothing is static. Even the Mona Lisa is falling apart. for ch in self._children: if isinstance(ch, Option): if child._name == ch._name: raise ConflictConfigError("existing option : {0}".format( child._name)) self._children.append(child) setattr(self, child._name, child) def update_child(self, child): "modification of an existing option" # XXX : corresponds to the `redefine`, is it usefull pass def getkey(self, config): return tuple([child.getkey(getattr(config, child._name)) for child in self._children]) def getpaths(self, include_groups=False, currpath=None): """returns a list of all paths in self, recursively currpath should not be provided (helps with recursion) """ if currpath is None: currpath = [] paths = [] for option in self._children: attr = option._name if attr.startswith('_cfgimpl'): continue if isinstance(option, OptionDescription): if include_groups: paths.append('.'.join(currpath + [attr])) currpath.append(attr) paths += option.getpaths(include_groups=include_groups, currpath=currpath) currpath.pop() else: paths.append('.'.join(currpath + [attr])) return paths # ____________________________________________________________ def set_group_type(self, group_type): if group_type in group_types: self.group_type = group_type else: raise ConfigError('not allowed value for group_type : {0}'.format( group_type)) def get_group_type(self): return self.group_type # ____________________________________________________________ def hide(self): super(OptionDescription, self).hide() # FIXME : AND THE SUBCHILDREN ? for child in self._children: if isinstance(child, OptionDescription): child.hide() def show(self): # FIXME : AND THE SUBCHILDREN ?? super(OptionDescription, self).show() for child in self._children: if isinstance(child, OptionDescription): child.show() # ____________________________________________________________ def disable(self): super(OptionDescription, self).disable() # FIXME : AND THE SUBCHILDREN ? for child in self._children: if isinstance(child, OptionDescription): child.disable() def enable(self): # FIXME : AND THE SUBCHILDREN ? super(OptionDescription, self).enable() for child in self._children: if isinstance(child, OptionDescription): child.enable() # ____________________________________________________________ def validate_requires_arg(requires, name): # malformed requirements config_action = [] for req in requires: if not type(req) == tuple and len(req) != 3: raise RequiresError("malformed requirements for option:" " {0}".format(name)) action = req[2] if action not in available_actions: raise RequiresError("malformed requirements for option: {0}" "unknown action: {1}".format(name, action)) if reverse_actions[action] in config_action: raise RequiresError("inconsistency in action types for option: {0}" "action: {1} in contradiction with {2}\n" " ({3})".format(name, action, reverse_actions[action], requires)) config_action.append(action) def build_actions(requires): trigger_actions = {} for require in requires: action = require[2] trigger_actions.setdefault(action, []).append(require) return trigger_actions def apply_requires(opt, config): if hasattr(opt, '_requires') and opt._requires is not None: rootconfig = config._cfgimpl_get_toplevel() validate_requires_arg(opt._requires, opt._name) # filters the callbacks trigger_actions = build_actions(opt._requires) for requires in trigger_actions.values(): matches = False for require in requires: name, expected, action = require path = config._cfgimpl_get_path() + '.' + opt._name if name.startswith(path): raise RequirementRecursionError("malformed requirements " "imbrication detected for option: '{0}' " "with requirement on: '{1}'".format(path, name)) homeconfig, shortname = rootconfig._cfgimpl_get_home_by_path(name) if shortname in homeconfig._cfgimpl_values: value = homeconfig._cfgimpl_values[shortname] if value == expected: getattr(opt, action)() #.hide() or show() or... # FIXME generic programming opt.property_launch(action, False) matches = True else: # option doesn't exist ! should not happen... raise NotFoundError("required option not found: " "{0}".format(name)) # no callback has been triggered, then just reverse the action if not matches: getattr(opt, reverse_actions[action])()