# -*- coding: utf-8 -*- "pretty small and local configuration management tool" # Copyright (C) 2012-2013 Team tiramisu (see AUTHORS for all contributors) # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation; either version 2 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program; if not, write to the Free Software # Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA # # The original `Config` design model is unproudly borrowed from # the rough pypy's guys: http://codespeak.net/svn/pypy/dist/pypy/config/ # the whole pypy projet is under MIT licence # ____________________________________________________________ from copy import copy from tiramisu.error import (PropertiesOptionError, ConfigError, NotFoundError, AmbigousOptionError, ConflictConfigError, NoMatchingOptionFound, MandatoryError, MethodCallError, NoValueReturned) from tiramisu.option import (OptionDescription, Option, SymLinkOption, apply_requires) from tiramisu.setting import groups, owners, Setting from tiramisu.value import OptionValues # ____________________________________________________________ class Config(object): "main configuration management entry" _cfgimpl_toplevel = None def __init__(self, descr, parent=None, context=None): """ Configuration option management master class :param descr: describes the configuration schema :type descr: an instance of ``option.OptionDescription`` :param parent: is None if the ``Config`` is root parent Config otherwise :type parent: ``Config`` :param context: the current root config :type context: `Config` """ # main option description self._cfgimpl_descr = descr # sub option descriptions self._cfgimpl_subconfigs = {} self._cfgimpl_parent = parent if context is None: self._cfgimpl_context = self else: self._cfgimpl_context = context if parent == None: self._cfgimpl_settings = Setting() self._cfgimpl_values = OptionValues(self._cfgimpl_context) else: if context is None: raise ConfigError("cannot find a value for this config") self._cfgimpl_settings = None self._cfgimpl_values = None "warnings are a great idea, let's make up a better use of it" self._cfgimpl_warnings = [] self._cfgimpl_toplevel = self._cfgimpl_get_toplevel() self._cfgimpl_build() def cfgimpl_get_settings(self): return self._cfgimpl_context._cfgimpl_settings def cfgimpl_set_settings(self, settings): if not isinstance(settings, Setting): raise ConfigError("setting not allowed") self._cfgimpl_context._cfgimpl_settings = settings def _validate_duplicates(self, children): """duplicates Option names in the schema :type children: list of `Option` or `OptionDescription` """ duplicates = [] for dup in children: if dup._name not in duplicates: duplicates.append(dup._name) else: raise ConflictConfigError('duplicate option name: ' '{0}'.format(dup._name)) def _cfgimpl_build(self): """ - builds the config object from the schema - settles various default values for options """ self._validate_duplicates(self._cfgimpl_descr._children) if self._cfgimpl_descr.group_type == groups.master: mastername = self._cfgimpl_descr._name masteropt = getattr(self._cfgimpl_descr, mastername) self._cfgimpl_values.master_groups[masteropt] = [] for child in self._cfgimpl_descr._children: if isinstance(child, OptionDescription): self._validate_duplicates(child._children) self._cfgimpl_subconfigs[child] = Config(child, parent=self, context=self._cfgimpl_context) if (self._cfgimpl_descr.group_type == groups.master and child != masteropt): self._cfgimpl_values.master_groups[child] = [] self._cfgimpl_values.master_groups[masteropt].append(child) if self._cfgimpl_descr.group_type == groups.master: print self._cfgimpl_values.master_groups # ____________________________________________________________ # attribute methods def __setattr__(self, name, value): "attribute notation mechanism for the setting of the value of an option" if name.startswith('_cfgimpl_'): self.__dict__[name] = value return if '.' in name: homeconfig, name = self._cfgimpl_get_home_by_path(name) return setattr(homeconfig, name, value) if type(getattr(self._cfgimpl_descr, name)) != SymLinkOption: self._validate(name, getattr(self._cfgimpl_descr, name)) self.setoption(name, value) def _validate(self, name, opt_or_descr, permissive=False): "validation for the setattr and the getattr" apply_requires(opt_or_descr, self, permissive=permissive) if not isinstance(opt_or_descr, Option) and \ not isinstance(opt_or_descr, OptionDescription): raise TypeError('Unexpected object: {0}'.format(repr(opt_or_descr))) properties = copy(opt_or_descr.properties) for proper in copy(properties): if not self._cfgimpl_context._cfgimpl_settings.has_property(proper): properties.remove(proper) if permissive: for perm in self._cfgimpl_context._cfgimpl_settings.permissive: if perm in properties: properties.remove(perm) if properties != []: raise PropertiesOptionError("trying to access" " to an option named: {0} with properties" " {1}".format(name, str(properties)), properties) def __getattr__(self, name): return self._getattr(name) # def fill_multi(self, opt, result, use_default_multi=False, default_multi=None): # """fills a multi option with default and calculated values # """ # # FIXME C'EST ENCORE DU N'IMPORTE QUOI # if not isinstance(result, list): # _result = [result] # else: # _result = result # return Multi(_result, self._cfgimpl_context, opt) def _getattr(self, name, permissive=False): """ attribute notation mechanism for accessing the value of an option :param name: attribute name :param permissive: permissive doesn't raise some property error (see ``permissive``) :return: option's value if name is an option name, OptionDescription otherwise """ # attribute access by passing a path, # for instance getattr(self, "creole.general.family.adresse_ip_eth0") if '.' in name: homeconfig, name = self._cfgimpl_get_home_by_path(name) return homeconfig._getattr(name, permissive) opt_or_descr = getattr(self._cfgimpl_descr, name) # symlink options if type(opt_or_descr) == SymLinkOption: rootconfig = self._cfgimpl_get_toplevel() return getattr(rootconfig, opt_or_descr.path) self._validate(name, opt_or_descr, permissive) if isinstance(opt_or_descr, OptionDescription): if opt_or_descr not in self._cfgimpl_subconfigs: raise AttributeError("%s with name %s object has no attribute %s" % (self.__class__, opt_or_descr._name, name)) return self._cfgimpl_subconfigs[opt_or_descr] # special attributes if name.startswith('_cfgimpl_'): # if it were in __dict__ it would have been found already return self.__dict__[name] return self._cfgimpl_context._cfgimpl_values[opt_or_descr] def unwrap_from_name(self, name): """convenience method to extract and Option() object from the Config() **and it is slow**: it recursively searches into the namespaces :returns: Option() """ paths = self.getpaths(allpaths=True) opts = dict([(path, self.unwrap_from_path(path)) for path in paths]) all_paths = [p.split(".") for p in self.getpaths()] for pth in all_paths: if name in pth: return opts[".".join(pth)] raise NotFoundError("name: {0} not found".format(name)) def unwrap_from_path(self, path): """convenience method to extract and Option() object from the Config() and it is **fast**: finds the option directly in the appropriate namespace :returns: Option() """ if '.' in path: homeconfig, path = self._cfgimpl_get_home_by_path(path) return getattr(homeconfig._cfgimpl_descr, path) return getattr(self._cfgimpl_descr, path) def setoption(self, name, value, who=None): """effectively modifies the value of an Option() (typically called by the __setattr__) """ child = getattr(self._cfgimpl_descr, name) child.setoption(self, value) def set(self, **kwargs): """ do what I mean"-interface to option setting. Searches all paths starting from that config for matches of the optional arguments and sets the found option if the match is not ambiguous. :param kwargs: dict of name strings to values. """ all_paths = [p.split(".") for p in self.getpaths(allpaths=True)] for key, value in kwargs.iteritems(): key_p = key.split('.') candidates = [p for p in all_paths if p[-len(key_p):] == key_p] if len(candidates) == 1: name = '.'.join(candidates[0]) homeconfig, name = self._cfgimpl_get_home_by_path(name) try: getattr(homeconfig, name) except MandatoryError: pass except Exception, e: raise e # HiddenOptionError or DisabledOptionError homeconfig.setoption(name, value) elif len(candidates) > 1: raise AmbigousOptionError( 'more than one option that ends with %s' % (key, )) else: raise NoMatchingOptionFound( 'there is no option that matches %s' ' or the option is hidden or disabled'% (key, )) def get(self, name): """ same as a `find_first()` method in a config that has identical names: it returns the first item of an option named `name` much like the attribute access way, except that the search for the option is performed recursively in the whole configuration tree. **carefull**: very slow ! :returns: option value. """ paths = self.getpaths(allpaths=True) pathsvalues = [] for path in paths: pathname = path.split('.')[-1] if pathname == name: try: value = getattr(self, path) return value except Exception, e: raise e raise NotFoundError("option {0} not found in config".format(name)) def _cfgimpl_get_home_by_path(self, path): """:returns: tuple (config, name)""" path = path.split('.') for step in path[:-1]: self = getattr(self, step) return self, path[-1] def _cfgimpl_get_toplevel(self): ":returns: root config" while self._cfgimpl_parent is not None: self = self._cfgimpl_parent return self def _cfgimpl_get_path(self): "the path in the attribute access meaning." subpath = [] obj = self while obj._cfgimpl_parent is not None: subpath.insert(0, obj._cfgimpl_descr._name) obj = obj._cfgimpl_parent return ".".join(subpath) # ______________________________________________________________________ # def cfgimpl_previous_value(self, path): # "stores the previous value" # home, name = self._cfgimpl_get_home_by_path(path) # # FIXME fucking name # return home._cfgimpl_context._cfgimpl_values.previous_values[name] # def get_previous_value(self, name): # "for the time being, only the previous Option's value is accessible" # return self._cfgimpl_context._cfgimpl_values.previous_values[name] # ______________________________________________________________________ def add_warning(self, warning): "Config implements its own warning pile. Could be useful" self._cfgimpl_get_toplevel()._cfgimpl_warnings.append(warning) def get_warnings(self): "Config implements its own warning pile" return self._cfgimpl_get_toplevel()._cfgimpl_warnings # ____________________________________________________________ def getkey(self): return self._cfgimpl_descr.getkey(self) def __hash__(self): return hash(self.getkey()) def __eq__(self, other): "Config comparison" if not isinstance(other, Config): return False return self.getkey() == other.getkey() def __ne__(self, other): "Config comparison" return not self == other # ______________________________________________________________________ def __iter__(self): """Pythonesque way of parsing group's ordered options. iteration only on Options (not OptionDescriptions)""" for child in self._cfgimpl_descr._children: if not isinstance(child, OptionDescription): try: yield child._name, getattr(self, child._name) except: pass # option with properties def iter_groups(self, group_type=None): """iteration on groups objects only. All groups are returned if `group_type` is `None`, otherwise the groups can be filtered by categories (families, or whatever). :param group_type: if defined, is an instance of `groups.GroupType` or `groups.MasterGroupType` that lives in `setting.groups` """ if group_type is not None: if not isinstance(group_type, groups.GroupType): raise TypeError("Unknown group_type: {0}".format(group_type)) for child in self._cfgimpl_descr._children: if isinstance(child, OptionDescription): try: if group_type is not None: if child.get_group_type() == group_type: yield child._name, getattr(self, child._name) else: yield child._name, getattr(self, child._name) except: pass # ______________________________________________________________________ def __str__(self): "Config's string representation" lines = [] for name, grp in self.iter_groups(): lines.append("[%s]" % name) for name, value in self: try: lines.append("%s = %s" % (name, value)) except: pass return '\n'.join(lines) __repr__ = __str__ def getpaths(self, include_groups=False, allpaths=False, mandatory=False): """returns a list of all paths in self, recursively, taking care of the context of properties (hidden/disabled) :param include_groups: if true, OptionDescription are included :param allpaths: all the options (event the properties protected ones) :param mandatory: includes the mandatory options :returns: list of all paths """ paths = [] for path in self._cfgimpl_descr.getpaths(include_groups=include_groups): try: value = getattr(self, path) except MandatoryError: if mandatory or allpaths: paths.append(path) except PropertiesOptionError: if allpaths: paths.append(path) # option which have properties added else: paths.append(path) return paths def _find(self, bytype, byname, byvalue, byattrs, first): """ convenience method for finding an option that lives only in the subtree :param first: return only one option if True, a list otherwise :return: find list or an exception if nothing has been found """ def _filter_by_attrs(): if byattrs is None: return True for key, value in byattrs.items(): if not hasattr(option, key): return False else: if getattr(option, key) != value: return False else: continue return True def _filter_by_name(): if byname is None: return True pathname = path.split('.')[-1] if pathname == byname: return True else: return False def _filter_by_value(): if byvalue is None: return True try: value = getattr(self, path) if value == byvalue: return True except: # a property restricts the access of the value pass return False def _filter_by_type(): if bytype is None: return True if isinstance(option, bytype): return True return False find_results = [] paths = self.getpaths(allpaths=True) for path in paths: try: option = self.unwrap_from_path(path) except PropertiesOptionError, err: continue if not _filter_by_name(): continue if not _filter_by_value(): continue if not _filter_by_type(): continue if not _filter_by_attrs(): continue if first: return option else: find_results.append(option) if find_results == []: raise NotFoundError("no option found in config with these criteria") else: return find_results def find(self, bytype=None, byname=None, byvalue=None, byattrs=None): """ finds a list of options recursively in the config :param bytype: Option class (BoolOption, StrOption, ...) :param byname: filter by Option._name :param byvalue: filter by the option's value :param byattrs: dict of option attributes (default, callback...) :returns: list of matching Option objects """ return self._find(bytype, byname, byvalue, byattrs, first=False) def find_first(self, bytype=None, byname=None, byvalue=None, byattrs=None): """ finds an option recursively in the config :param bytype: Option class (BoolOption, StrOption, ...) :param byname: filter by Option._name :param byvalue: filter by the option's value :param byattrs: dict of option attributes (default, callback...) :returns: list of matching Option objects """ return self._find(bytype, byname, byvalue, byattrs, first=True) def make_dict(config, flatten=False): """export the whole config into a `dict` :returns: dict of Option's name (or path) and values""" paths = config.getpaths() pathsvalues = [] for path in paths: if flatten: pathname = path.split('.')[-1] else: pathname = path try: value = getattr(config, path) pathsvalues.append((pathname, value)) except: pass # this just a hidden or disabled option options = dict(pathsvalues) return options def mandatory_warnings(config): """convenience function to trace Options that are mandatory and where no value has been set :returns: generator of mandatory Option's path FIXME : CAREFULL : not multi-user """ mandatory = config._cfgimpl_context._cfgimpl_settings.mandatory config._cfgimpl_context._cfgimpl_settings.mandatory = True for path in config._cfgimpl_descr.getpaths(include_groups=True): try: value = config._getattr(path, permissive=True) except MandatoryError: yield path except PropertiesOptionError: pass config._cfgimpl_context._cfgimpl_settings.mandatory = mandatory