# -*- coding: utf-8 -*- "pretty small and local configuration management tool" # Copyright (C) 2012 Team tiramisu (see AUTHORS for all contributors) # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation; either version 2 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program; if not, write to the Free Software # Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA # # The original `Config` design model is unproudly borrowed from # the rough pypy's guys: http://codespeak.net/svn/pypy/dist/pypy/config/ # the whole pypy projet is under MIT licence # ____________________________________________________________ from copy import copy from tiramisu.error import (PropertiesOptionError, ConfigError, NotFoundError, AmbigousOptionError, ConflictConfigError, NoMatchingOptionFound, MandatoryError, MethodCallError) from tiramisu.option import (OptionDescription, Option, SymLinkOption, group_types, Multi, apply_requires) from tiramisu.autolib import carry_out_calculation # ______________________________________________________________________ # generic owner. 'default' is the general config owner after init time default_owner = 'user' # ____________________________________________________________ class Config(object): "properties attribute: the name of a property enables this property" _cfgimpl_properties = ['hidden', 'disabled'] "mandatory means: a mandatory option has to have a value that is not None" _cfgimpl_mandatory = True _cfgimpl_frozen = True _cfgimpl_owner = default_owner _cfgimpl_toplevel = None def __init__(self, descr, parent=None, **overrides): """ Configuration option management master class :param descr: describes the configuration schema :type descr: an instance of ``option.OptionDescription`` :param overrides: can be used to set different default values (see method ``override``) :param parent: is None if the ``Config`` is root parent Config otherwise :type parent: ``Config`` """ self._cfgimpl_descr = descr self._cfgimpl_value_owners = {} self._cfgimpl_parent = parent "`Config()` indeed is in charge of the `Option()`'s values" self._cfgimpl_values = {} self._cfgimpl_previous_values = {} "warnings are a great idea, let's make up a better use of it" self._cfgimpl_warnings = [] self._cfgimpl_toplevel = self._cfgimpl_get_toplevel() '`freeze()` allows us to carry out this calculation again if necessary' self._cfgimpl_frozen = self._cfgimpl_toplevel._cfgimpl_frozen self._cfgimpl_build(overrides) def _validate_duplicates(self, children): """duplicates Option names in the schema :type children: list of `Option` or `OptionDescription` """ duplicates = [] for dup in children: if dup._name not in duplicates: duplicates.append(dup._name) else: raise ConflictConfigError('duplicate option name: ' '{0}'.format(dup._name)) def _cfgimpl_build(self, overrides): """ - builds the config object from the schema - settles various default values for options :param overrides: dict of options name:default values """ self._validate_duplicates(self._cfgimpl_descr._children) for child in self._cfgimpl_descr._children: if isinstance(child, Option): if child.is_multi(): childdef = Multi(copy(child.getdefault()), config=self, child=child) self._cfgimpl_values[child._name] = childdef self._cfgimpl_previous_values[child._name] = list(childdef) self._cfgimpl_value_owners[child._name] = ['default' \ for i in range(len(child.getdefault() ))] else: childdef = child.getdefault() self._cfgimpl_values[child._name] = childdef self._cfgimpl_previous_values[child._name] = childdef self._cfgimpl_value_owners[child._name] = 'default' elif isinstance(child, OptionDescription): self._validate_duplicates(child._children) self._cfgimpl_values[child._name] = Config(child, parent=self) self.override(overrides) def cfgimpl_update(self): """dynamically adds `Option()` or `OptionDescription()` """ # FIXME this is an update for new options in the schema only # see the update_child() method of the descr object for child in self._cfgimpl_descr._children: if isinstance(child, Option): if child._name not in self._cfgimpl_values: if child.is_multi(): self._cfgimpl_values[child._name] = Multi( copy(child.getdefault()), config=self, child=child) self._cfgimpl_value_owners[child._name] = ['default' \ for i in range(len(child.getdefault() ))] else: self._cfgimpl_values[child._name] = copy(child.getdefault()) self._cfgimpl_value_owners[child._name] = 'default' elif isinstance(child, OptionDescription): if child._name not in self._cfgimpl_values: self._cfgimpl_values[child._name] = Config(child, parent=self) def override(self, overrides): """ overrides default values. This marks the overridden values as defaults. :param overrides: is a dictionary of path strings to values. """ for name, value in overrides.iteritems(): homeconfig, name = self._cfgimpl_get_home_by_path(name) homeconfig.setoption(name, value, 'default') def cfgimpl_set_owner(self, owner): ":param owner: sets the default value for owner at the Config level" self._cfgimpl_owner = owner for child in self._cfgimpl_descr._children: if isinstance(child, OptionDescription): self._cfgimpl_values[child._name].cfgimpl_set_owner(owner) # ____________________________________________________________ # properties methods def _cfgimpl_has_properties(self): "has properties means the Config's properties attribute is not empty" return bool(len(self._cfgimpl_properties)) def _cfgimpl_has_property(self, propname): """has property propname in the Config's properties attribute :param property: string wich is the name of the property""" return propname in self._cfgimpl_properties def cfgimpl_enable_property(self, propname): "puts property propname in the Config's properties attribute" if self._cfgimpl_parent != None: raise MethodCallError("this method root_hide() shall not be" "used with non-root Config() object") if propname not in self._cfgimpl_properties: self._cfgimpl_properties.append(propname) def cfgimpl_disable_property(self, propname): "deletes property propname in the Config's properties attribute" if self._cfgimpl_parent != None: raise MethodCallError("this method root_hide() shall not be" "used with non-root Config() object") if self._cfgimpl_has_property(propname): self._cfgimpl_properties.remove(propname) # ____________________________________________________________ # attribute methods def __setattr__(self, name, value): "attribute notation mechanism for the setting of the value of an option" if name.startswith('_cfgimpl_'): self.__dict__[name] = value return if '.' in name: homeconfig, name = self._cfgimpl_get_home_by_path(name) return setattr(homeconfig, name, value) if type(getattr(self._cfgimpl_descr, name)) != SymLinkOption: self._validate(name, getattr(self._cfgimpl_descr, name)) self.setoption(name, value, self._cfgimpl_owner) def _validate(self, name, opt_or_descr): "validation for the setattr and the getattr" apply_requires(opt_or_descr, self) if not isinstance(opt_or_descr, Option) and \ not isinstance(opt_or_descr, OptionDescription): raise TypeError('Unexpected object: {0}'.format(repr(opt_or_descr))) properties = copy(opt_or_descr.properties) for proper in properties: if not self._cfgimpl_toplevel._cfgimpl_has_property(proper): properties.remove(proper) if properties != []: raise PropertiesOptionError("trying to access" " to an option named: {0} with properties" " {1}".format(name, str(properties)), properties) def _is_empty(self, opt): "convenience method to know if an option is empty" if (not opt.is_multi() and self._cfgimpl_values[opt._name] == None) or \ (opt.is_multi() and (self._cfgimpl_values[opt._name] == [] or \ None in self._cfgimpl_values[opt._name])): return True return False def __getattr__(self, name): "attribute notation mechanism for accessing the value of an option" # attribute access by passing a path, # for instance getattr(self, "creole.general.family.adresse_ip_eth0") if '.' in name: homeconfig, name = self._cfgimpl_get_home_by_path(name) return getattr(homeconfig, name) opt_or_descr = getattr(self._cfgimpl_descr, name) # symlink options if type(opt_or_descr) == SymLinkOption: return getattr(self, opt_or_descr.path) if name not in self._cfgimpl_values: raise AttributeError("%s object has no attribute %s" % (self.__class__, name)) self._validate(name, opt_or_descr) # special attributes if name.startswith('_cfgimpl_'): # if it were in __dict__ it would have been found already return self.__dict__[name] raise AttributeError("%s object has no attribute %s" % (self.__class__, name)) if not isinstance(opt_or_descr, OptionDescription): # options with callbacks (fill or auto) if opt_or_descr.has_callback(): value = self._cfgimpl_values[name] if (not opt_or_descr.is_frozen() or \ not opt_or_descr.is_forced_on_freeze()) and value != None: if opt_or_descr.is_multi(): if None not in value: return value else: return value result = carry_out_calculation(name, callback=opt_or_descr.getcallback(), callback_params=opt_or_descr.getcallback_params(), config=self._cfgimpl_get_toplevel()) # this result **shall not** be a list # for example, [1, 2, 3, None] -> [1, 2, 3, result] if isinstance(result, list): raise ConfigError('invalid calculated value returned' ' for option {0} : shall not be a list'.format(name)) if result != None and not opt_or_descr._validate(result): raise ConfigError('invalid calculated value returned' ' for option {0}'.format(name)) if opt_or_descr.is_multi(): if value == []: _result = Multi([result], value.config, value.child) else: _result = Multi([], value.config, value.child) for val in value: if val == None: val = result _result.append(val) else: _result = result return _result # mandatory options homeconfig = self._cfgimpl_get_toplevel() mandatory = homeconfig._cfgimpl_mandatory if opt_or_descr.is_mandatory() and mandatory: if self._is_empty(opt_or_descr) and \ opt_or_descr.is_empty_by_default(): raise MandatoryError("option: {0} is mandatory " "and shall have a value".format(name)) # frozen and force default if opt_or_descr.is_forced_on_freeze(): return opt_or_descr.getdefault() return self._cfgimpl_values[name] def unwrap_from_name(self, name): """convenience method to extract and Option() object from the Config() **and it is slow**: it recursively searches into the namespaces :returns: Option() """ paths = self.getpaths(allpaths=True) opts = dict([(path, self.unwrap_from_path(path)) for path in paths]) all_paths = [p.split(".") for p in self.getpaths()] for pth in all_paths: if name in pth: return opts[".".join(pth)] raise NotFoundError("name: {0} not found".format(name)) def unwrap_from_path(self, path): """convenience method to extract and Option() object from the Config() and it is **fast**: finds the option directly in the appropriate namespace :returns: Option() """ if '.' in path: homeconfig, path = self._cfgimpl_get_home_by_path(path) return getattr(homeconfig._cfgimpl_descr, path) return getattr(self._cfgimpl_descr, path) def __delattr__(self, name): "if you use delattr you are responsible for all bad things happening" if name.startswith('_cfgimpl_'): del self.__dict__[name] return self._cfgimpl_value_owners[name] = 'default' opt = getattr(self._cfgimpl_descr, name) if isinstance(opt, OptionDescription): raise AttributeError("can't option subgroup") self._cfgimpl_values[name] = getattr(opt, 'default', None) def setoption(self, name, value, who=None): """effectively modifies the value of an Option() (typically called by the __setattr__) :param who: is an owner's name who is **not necessarily** a owner, because it cannot be a list :type who: string """ child = getattr(self._cfgimpl_descr, name) if who == None: if child.is_multi(): newowner = [self._cfgimpl_owner for i in range(len(value))] else: newowner = self._cfgimpl_owner else: if type(child) != SymLinkOption: if child.is_multi(): if type(value) != Multi: if type(value) == list: value = Multi(value, self, child) else: raise ConfigError("invalid value for option:" " {0} that is set to multi".format(name)) newowner = [who for i in range(len(value))] else: newowner = who if type(child) != SymLinkOption: if child.has_callback() and who=='default': raise TypeError("trying to set a value to an option " "wich has a callback: {0}".format(name)) child.setoption(self, value, who) if (value is None and who != 'default' and not child.is_multi()): child.setowner(self, 'default') self._cfgimpl_values[name] = copy(child.getdefault()) elif (value == [] and who != 'default' and child.is_multi()): child.setowner(self, ['default' for i in range(len(child.getdefault()))]) self._cfgimpl_values[name] = Multi(copy(child.getdefault()), config=self, child=child) else: child.setowner(self, newowner) else: homeconfig = self._cfgimpl_get_toplevel() child.setoption(homeconfig, value, who) def set(self, **kwargs): """ "do what I mean"-interface to option setting. Searches all paths starting from that config for matches of the optional arguments and sets the found option if the match is not ambiguous. :param kwargs: dict of name strings to values. """ all_paths = [p.split(".") for p in self.getpaths(allpaths=True)] for key, value in kwargs.iteritems(): key_p = key.split('.') candidates = [p for p in all_paths if p[-len(key_p):] == key_p] if len(candidates) == 1: name = '.'.join(candidates[0]) homeconfig, name = self._cfgimpl_get_home_by_path(name) try: getattr(homeconfig, name) except MandatoryError: pass except Exception, e: raise e # HiddenOptionError or DisabledOptionError homeconfig.setoption(name, value, self._cfgimpl_owner) elif len(candidates) > 1: raise AmbigousOptionError( 'more than one option that ends with %s' % (key, )) else: raise NoMatchingOptionFound( 'there is no option that matches %s' ' or the option is hidden or disabled'% (key, )) def get(self, name): """ same as a find_first() method in a config that has identical names that is : Returns the first item of an option named 'name' much like the attribute access way, except that the search for the option is performed recursively in the whole configuration tree. **carefull**: very slow ! :returns: option value. """ paths = self.getpaths(allpaths=True) pathsvalues = [] for path in paths: pathname = path.split('.')[-1] if pathname == name: try: value = getattr(self, path) return value except Exception, e: raise e raise NotFoundError("option {0} not found in config".format(name)) def _cfgimpl_get_home_by_path(self, path): """:returns: tuple (config, name)""" path = path.split('.') for step in path[:-1]: self = getattr(self, step) return self, path[-1] def _cfgimpl_get_toplevel(self): ":returns: root config" while self._cfgimpl_parent is not None: self = self._cfgimpl_parent return self def _cfgimpl_get_path(self): "the path in the attribute access meaning." subpath = [] obj = self while obj._cfgimpl_parent is not None: subpath.insert(0, obj._cfgimpl_descr._name) obj = obj._cfgimpl_parent return ".".join(subpath) # ______________________________________________________________________ def cfgimpl_previous_value(self, path): "stores the previous value" home, name = self._cfgimpl_get_home_by_path(path) return home._cfgimpl_previous_values[name] def get_previous_value(self, name): "for the time being, only the previous Option's value is accessible" return self._cfgimpl_previous_values[name] # ______________________________________________________________________ def add_warning(self, warning): "Config implements its own warning pile. Could be useful" self._cfgimpl_get_toplevel()._cfgimpl_warnings.append(warning) def get_warnings(self): "Config implements its own warning pile" return self._cfgimpl_get_toplevel()._cfgimpl_warnings # ____________________________________________________________ # Config()'s status def cfgimpl_freeze(self): "cannot modify the frozen `Option`'s" rootconfig = self._cfgimpl_get_toplevel() rootconfig._cfgimpl_frozen = True self._cfgimpl_frozen = True def cfgimpl_unfreeze(self): "can modify the Options that are frozen" rootconfig = self._cfgimpl_get_toplevel() rootconfig._cfgimpl_frozen = False self._cfgimpl_frozen = False def is_frozen(self): "freeze flag at Config level" rootconfig = self._cfgimpl_get_toplevel() return rootconfig._cfgimpl_frozen def cfgimpl_read_only(self): "convenience method to freeze, hidde and disable" self.cfgimpl_freeze() rootconfig = self._cfgimpl_get_toplevel() rootconfig.cfgimpl_disable_property('hidden') rootconfig.cfgimpl_enable_property('disabled') rootconfig._cfgimpl_mandatory = True def cfgimpl_read_write(self): "convenience method to freeze, hidde and disable" self.cfgimpl_freeze() rootconfig = self._cfgimpl_get_toplevel() rootconfig.cfgimpl_enable_property('hidden') rootconfig.cfgimpl_enable_property('disabled') rootconfig._cfgimpl_mandatory = False def cfgimpl_non_mandatory(self): """mandatory at the Config level means that the Config raises an error if a mandatory option is found""" if self._cfgimpl_parent != None: raise MethodCallError("this method root_mandatory shall" " not be used with non-root Confit() object") rootconfig = self._cfgimpl_get_toplevel() rootconfig._cfgimpl_mandatory = False def cfgimpl_mandatory(self): """mandatory at the Config level means that the Config raises an error if a mandatory option is found""" if self._cfgimpl_parent != None: raise MethodCallError("this method root_mandatory shall" " not be used with non-root Confit() object") rootconfig = self._cfgimpl_get_toplevel() rootconfig._cfgimpl_mandatory = True def is_mandatory(self): "all mandatory Options shall have a value" rootconfig = self._cfgimpl_get_toplevel() return rootconfig._cfgimpl_mandatory # ____________________________________________________________ def getkey(self): return self._cfgimpl_descr.getkey(self) def __hash__(self): return hash(self.getkey()) def __eq__(self, other): "Config comparison" return self.getkey() == other.getkey() def __ne__(self, other): "Config comparison" return not self == other # ______________________________________________________________________ def __iter__(self): "iteration only on Options (not OptionDescriptions)" for child in self._cfgimpl_descr._children: if isinstance(child, Option): try: yield child._name, getattr(self, child._name) except: pass # option with properties def iter_groups(self, group_type=None): "iteration on OptionDescriptions" if group_type == None: groups = group_types else: if group_type not in group_types: raise TypeError("Unknown group_type: {0}".format(group_type)) groups = [group_type] for child in self._cfgimpl_descr._children: if isinstance(child, OptionDescription): try: if child.get_group_type() in groups: yield child._name, getattr(self, child._name) except: pass # hidden, disabled option # ______________________________________________________________________ def __str__(self, indent=""): "Config's string representation" lines = [] children = [(child._name, child) for child in self._cfgimpl_descr._children] children.sort() for name, child in children: if self._cfgimpl_value_owners.get(name, None) == 'default': continue value = getattr(self, name) if isinstance(value, Config): substr = value.__str__(indent + " ") else: substr = "%s %s = %s" % (indent, name, value) if substr: lines.append(substr) if indent and not lines: return '' # hide subgroups with all default values lines.insert(0, "%s[%s]" % (indent, self._cfgimpl_descr._name,)) return '\n'.join(lines) def getpaths(self, include_groups=False, allpaths=False, mandatory=False): """returns a list of all paths in self, recursively, taking care of the context of properties (hidden/disabled) :param include_groups: if true, OptionDescription are included :param allpaths: all the options (event the properties protected ones) :param mandatory: includes the mandatory options :returns: list of all paths """ paths = [] for path in self._cfgimpl_descr.getpaths(include_groups=include_groups): try: value = getattr(self, path) except MandatoryError: if mandatory or allpaths: paths.append(path) except PropertiesOptionError: if allpaths: paths.append(path) # option which have properties added else: paths.append(path) return paths def find(self, bytype=None, byname=None, byvalue=None, byattrs=None): """ finds an option recursively in the config :param bytype: Option class (BoolOption, StrOption, ...) :param byname: filter by Option._name :param byvalue: filter by the option's value :param byattrs: dict of option attributes (default, callback...) :returns: list of matching Option objects """ def _filter_by_name(): if byname is not None: pathname = path.split('.')[-1] if pathname == byname: yield option def _filter_by_value(): if byvalue is not None: try: value = getattr(self, path) if value == byvalue: yield option except Exception, e: # a property restricts the acces to value pass def _filter_by_type(): if bytype is not None: if isinstance(option, bytype): find_results.append(self.unwrap_from_path(path)) find_results = [] paths = self.getpaths(allpaths=True) for path in paths: option = self.unwrap_from_path(path) if _filter_by_name() is not None: find_results.extend(list( _filter_by_name() )) if _filter_by_value() is not None: find_results.extend(list( _filter_by_value() )) if _filter_by_type() is not None: find_results.extend(list( _filter_by_type() )) return find_results def make_dict(config, flatten=False): """export the whole config into a `dict` :returns: dict of Option's name (or path) and values""" paths = config.getpaths() pathsvalues = [] for path in paths: if flatten: pathname = path.split('.')[-1] else: pathname = path try: value = getattr(config, path) pathsvalues.append((pathname, value)) except: pass # this just a hidden or disabled option options = dict(pathsvalues) return options def mandatory_warnings(config): """convenience function to trace Options that are mandatory and where no value has been set :returns: generator of mandatory Option's path """ mandatory = config._cfgimpl_get_toplevel()._cfgimpl_mandatory config._cfgimpl_get_toplevel()._cfgimpl_mandatory = True for path in config._cfgimpl_descr.getpaths(include_groups=True): try: value = getattr(config, path) except MandatoryError: yield path except PropertiesOptionError: pass config._cfgimpl_get_toplevel()._cfgimpl_mandatory = mandatory