# -*- coding: utf-8 -*- "pretty small and local configuration management tool" # Copyright (C) 2012-2013 Team tiramisu (see AUTHORS for all contributors) # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation; either version 2 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program; if not, write to the Free Software # Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA # # The original `Config` design model is unproudly borrowed from # the rough pypy's guys: http://codespeak.net/svn/pypy/dist/pypy/config/ # the whole pypy projet is under MIT licence # ____________________________________________________________ #from inspect import getmembers, ismethod from tiramisu.error import PropertiesOptionError, ConfigError from tiramisu.option import OptionDescription, Option, SymLinkOption from tiramisu.setting import groups, Setting from tiramisu.value import Values from tiramisu.i18n import _ class SubConfig(object): "sub configuration management entry" __slots__ = ('_cfgimpl_context', '_cfgimpl_descr') def __init__(self, descr, context): """ Configuration option management master class :param descr: describes the configuration schema :type descr: an instance of ``option.OptionDescription`` :param context: the current root config :type context: `Config` """ # main option description if not isinstance(descr, OptionDescription): raise ValueError(_('descr must be an optiondescription, not {0}' '').format(type(descr))) self._cfgimpl_descr = descr # sub option descriptions if not isinstance(context, SubConfig): raise ValueError('context must be a SubConfig') self._cfgimpl_context = context def cfgimpl_reset_cache(self, only_expired=False, only=('values', 'settings')): self.cfgimpl_get_context().cfgimpl_reset_cache(only_expired, only) def cfgimpl_get_consistancies(self): return self.cfgimpl_get_context().cfgimpl_get_description()._consistancies def cfgimpl_get_home_by_path(self, path, force_permissive=False, force_properties=None): """:returns: tuple (config, name)""" path = path.split('.') for step in path[:-1]: self = self._getattr(step, force_permissive=force_permissive, force_properties=force_properties) return self, path[-1] def __hash__(self): return hash(self.cfgimpl_get_description().optimpl_getkey(self)) def __eq__(self, other): "Config comparison" if not isinstance(other, Config): return False return self.cfgimpl_get_description().optimpl_getkey(self) == \ other.cfgimpl_get_description().optimpl_getkey(other) def __ne__(self, other): "Config comparison" if not isinstance(other, Config): return False return not self == other # ______________________________________________________________________ def __iter__(self): """Pythonesque way of parsing group's ordered options. iteration only on Options (not OptionDescriptions)""" for child in self.cfgimpl_get_description().optimpl_getchildren(): if not isinstance(child, OptionDescription): try: yield child._name, getattr(self, child._name) except GeneratorExit: raise StopIteration except PropertiesOptionError: pass # option with properties def iter_all(self): """A way of parsing options **and** groups. iteration on Options and OptionDescriptions.""" for child in self.cfgimpl_get_description().optimpl_getchildren(): try: yield child._name, getattr(self, child._name) except GeneratorExit: raise StopIteration except PropertiesOptionError: pass # option with properties def iter_groups(self, group_type=None): """iteration on groups objects only. All groups are returned if `group_type` is `None`, otherwise the groups can be filtered by categories (families, or whatever). :param group_type: if defined, is an instance of `groups.GroupType` or `groups.MasterGroupType` that lives in `setting.groups` """ if group_type is not None: if not isinstance(group_type, groups.GroupType): raise TypeError(_("unknown group_type: {0}").format(group_type)) for child in self.cfgimpl_get_description().optimpl_getchildren(): if isinstance(child, OptionDescription): try: if group_type is None or (group_type is not None and child.optimpl_get_group_type() == group_type): yield child._name, getattr(self, child._name) except GeneratorExit: raise StopIteration except PropertiesOptionError: pass # ______________________________________________________________________ def __str__(self): "Config's string representation" lines = [] for name, grp in self.iter_groups(): lines.append("[%s]" % name) for name, value in self: try: lines.append("%s = %s" % (name, value)) except PropertiesOptionError: pass return '\n'.join(lines) __repr__ = __str__ def cfgimpl_get_context(self): return self._cfgimpl_context def cfgimpl_get_description(self): if self._cfgimpl_descr is None: raise ConfigError(_('no optiondescription for this config (may be MetaConfig without meta)')) else: return self._cfgimpl_descr def cfgimpl_get_settings(self): return self.cfgimpl_get_context()._cfgimpl_settings def cfgimpl_get_values(self): return self.cfgimpl_get_context()._cfgimpl_values # ____________________________________________________________ # attribute methods def __setattr__(self, name, value): "attribute notation mechanism for the setting of the value of an option" if name.startswith('_cfgimpl_'): #self.__dict__[name] = value object.__setattr__(self, name, value) return self._setattr(name, value) def _setattr(self, name, value, force_permissive=False): if '.' in name: homeconfig, name = self.cfgimpl_get_home_by_path(name) return homeconfig.__setattr__(name, value) child = getattr(self.cfgimpl_get_description(), name) if not isinstance(child, SymLinkOption): self.cfgimpl_get_values().setitem(child, value, force_permissive=force_permissive) else: context = self.cfgimpl_get_context() path = context.cfgimpl_get_description().optimpl_get_path_by_opt(child._opt) context._setattr(path, value, force_permissive=force_permissive) def __delattr__(self, name): child = getattr(self.cfgimpl_get_description(), name) del(self.cfgimpl_get_values()[child]) def __getattr__(self, name): return self._getattr(name) def _getattr(self, name, force_permissive=False, force_properties=None, validate=True): """ attribute notation mechanism for accessing the value of an option :param name: attribute name :return: option's value if name is an option name, OptionDescription otherwise """ # attribute access by passing a path, # for instance getattr(self, "creole.general.family.adresse_ip_eth0") if '.' in name: homeconfig, name = self.cfgimpl_get_home_by_path(name, force_permissive=force_permissive, force_properties=force_properties) return homeconfig._getattr(name, force_permissive=force_permissive, force_properties=force_properties, validate=validate) # special attributes if name.startswith('_cfgimpl_') or name.startswith('cfgimpl_'): # if it were in __dict__ it would have been found already return object.__getattribute__(self, name) opt_or_descr = getattr(self.cfgimpl_get_description(), name) # symlink options if isinstance(opt_or_descr, SymLinkOption): context = self.cfgimpl_get_context() path = context.cfgimpl_get_description().optimpl_get_path_by_opt(opt_or_descr._opt) return context._getattr(path, validate=validate, force_properties=force_properties, force_permissive=force_permissive) elif isinstance(opt_or_descr, OptionDescription): self.cfgimpl_get_settings().validate_properties(opt_or_descr, True, False, force_permissive=force_permissive, force_properties=force_properties) return SubConfig(opt_or_descr, self.cfgimpl_get_context()) else: return self.cfgimpl_get_values().getitem(opt_or_descr, validate=validate, force_properties=force_properties, force_permissive=force_permissive) def find(self, bytype=None, byname=None, byvalue=None, type_='option'): """ finds a list of options recursively in the config :param bytype: Option class (BoolOption, StrOption, ...) :param byname: filter by Option._name :param byvalue: filter by the option's value :returns: list of matching Option objects """ return self.cfgimpl_get_context()._find(bytype, byname, byvalue, first=False, type_=type_, _subpath=self.cfgimpl_get_path()) def find_first(self, bytype=None, byname=None, byvalue=None, type_='option'): """ finds an option recursively in the config :param bytype: Option class (BoolOption, StrOption, ...) :param byname: filter by Option._name :param byvalue: filter by the option's value :returns: list of matching Option objects """ return self.cfgimpl_get_context()._find(bytype, byname, byvalue, first=True, type_=type_, _subpath=self.cfgimpl_get_path()) def _find(self, bytype, byname, byvalue, first, type_='option', _subpath=None, check_properties=True): """ convenience method for finding an option that lives only in the subtree :param first: return only one option if True, a list otherwise :return: find list or an exception if nothing has been found """ def _filter_by_name(): try: if byname is None or path == byname or path.endswith('.' + byname): return True except IndexError: pass return False def _filter_by_value(): if byvalue is None: return True try: value = getattr(self, path) if value == byvalue: return True except PropertiesOptionError: # a property restricts the access of the value pass return False def _filter_by_type(): if bytype is None: return True if isinstance(option, bytype): return True return False #def _filter_by_attrs(): # if byattrs is None: # return True # for key, val in byattrs.items(): # print "----", path, key # if path == key or path.endswith('.' + key): # if value == val: # return True # else: # return False # return False if type_ not in ('option', 'path', 'context', 'value'): raise ValueError(_('unknown type_ type {0} for _find').format(type_)) find_results = [] opts, paths = self.cfgimpl_get_description()._cache_paths for index in range(0, len(paths)): option = opts[index] if isinstance(option, OptionDescription): continue path = paths[index] if _subpath is not None and not path.startswith(_subpath + '.'): continue if not _filter_by_name(): continue if not _filter_by_value(): continue #remove option with propertyerror, ... if check_properties: try: value = getattr(self, path) except PropertiesOptionError: # a property restricts the access of the value continue if not _filter_by_type(): continue #if not _filter_by_attrs(): # continue if type_ == 'value': retval = value elif type_ == 'path': retval = path elif type_ == 'option': retval = option elif type_ == 'context': retval = self.cfgimpl_get_context() if first: return retval else: find_results.append(retval) if find_results == []: #FIXME too slow #raise AttributeError(_("no option found in config with these criteria")) raise AttributeError("no option found in config with these criteria") else: return find_results def make_dict(self, flatten=False, _currpath=None, withoption=None, withvalue=None): """export the whole config into a `dict` :returns: dict of Option's name (or path) and values""" pathsvalues = [] if _currpath is None: _currpath = [] if withoption is None and withvalue is not None: raise ValueError(_("make_dict can't filtering with value without " "option")) if withoption is not None: mypath = self.cfgimpl_get_path() for path in self.cfgimpl_get_context()._find(bytype=Option, byname=withoption, byvalue=withvalue, first=False, type_='path', _subpath=mypath): path = '.'.join(path.split('.')[:-1]) opt = self.cfgimpl_get_context().cfgimpl_get_description().optimpl_get_opt_by_path(path) if mypath is not None: if mypath == path: withoption = None withvalue = None break else: tmypath = mypath + '.' if not path.startswith(tmypath): raise AttributeError(_('unexpected path {0}, ' 'should start with {1}' '').format(path, mypath)) path = path[len(tmypath):] self._make_sub_dict(opt, path, pathsvalues, _currpath, flatten) #withoption can be set to None below ! if withoption is None: for opt in self.cfgimpl_get_description().optimpl_getchildren(): path = opt._name self._make_sub_dict(opt, path, pathsvalues, _currpath, flatten) if _currpath == []: options = dict(pathsvalues) return options return pathsvalues def _make_sub_dict(self, opt, path, pathsvalues, _currpath, flatten): if isinstance(opt, OptionDescription): try: pathsvalues += getattr(self, path).make_dict(flatten, _currpath + path.split('.')) except PropertiesOptionError: pass # this just a hidden or disabled option else: try: value = self._getattr(opt._name) if flatten: name = opt._name else: name = '.'.join(_currpath + [opt._name]) pathsvalues.append((name, value)) except PropertiesOptionError: pass # this just a hidden or disabled option def cfgimpl_get_path(self): descr = self.cfgimpl_get_description() context_descr = self.cfgimpl_get_context().cfgimpl_get_description() return context_descr.optimpl_get_path_by_opt(descr) class ConfigCommon(SubConfig): __slots__ = ('_cfgimpl_values', '_cfgimpl_settings', '_cfgimpl_meta') def _cfgimpl_build_all_paths(self): self.cfgimpl_get_description().optimpl_build_cache() def read_only(self): self.cfgimpl_get_settings().read_only() def read_write(self): self.cfgimpl_get_settings().read_write() def getowner(self, path): opt = self.cfgimpl_get_description().optimpl_get_opt_by_path(path) return self.cfgimpl_get_values().getowner(opt) def unwrap_from_path(self, path): """convenience method to extract and Option() object from the Config() and it is **fast**: finds the option directly in the appropriate namespace :returns: Option() """ if '.' in path: homeconfig, path = self.cfgimpl_get_home_by_path(path) return getattr(homeconfig.cfgimpl_get_description(), path) return getattr(self.cfgimpl_get_description(), path) def cfgimpl_get_path(self): return None def cfgimpl_get_meta(self): return self._cfgimpl_meta # ____________________________________________________________ class Config(ConfigCommon): "main configuration management entry" __slots__ = tuple() def __init__(self, descr): """ Configuration option management master class :param descr: describes the configuration schema :type descr: an instance of ``option.OptionDescription`` :param context: the current root config :type context: `Config` """ self._cfgimpl_settings = Setting(self) self._cfgimpl_values = Values(self) super(Config, self).__init__(descr, self) # , slots) self._cfgimpl_build_all_paths() self._cfgimpl_meta = None def cfgimpl_reset_cache(self, only_expired=False, only=('values', 'settings')): if 'values' in only: self.cfgimpl_get_values().reset_cache(only_expired=only_expired) if 'settings' in only: self.cfgimpl_get_settings().reset_cache(only_expired=only_expired) class MetaConfig(ConfigCommon): __slots__ = ('_cfgimpl_children',) def __init__(self, children, meta=True): if not isinstance(children, list): raise ValueError(_("metaconfig's children must be a list")) self._cfgimpl_descr = None if meta: for child in children: if not isinstance(child, ConfigCommon): raise ValueError(_("metaconfig's children must be Config, not {0}" "".format(type(child)))) if self._cfgimpl_descr is None: self._cfgimpl_descr = child.cfgimpl_get_description() elif not self._cfgimpl_descr is child.cfgimpl_get_description(): raise ValueError(_('all config in MetaConfig must have same ' 'optiondescription')) if child.cfgimpl_get_meta() is not None: raise ValueError(_("child has already a metaconfig's")) child._cfgimpl_meta = self self._cfgimpl_children = children self._cfgimpl_settings = Setting(self) self._cfgimpl_values = Values(self) self._cfgimpl_meta = None def cfgimpl_get_context(self): return self def cfgimpl_reset_cache(self, only_expired=False, only=('values', 'settings')): if 'values' in only: self.cfgimpl_get_values().reset_cache(only_expired=only_expired) if 'settings' in only: self.cfgimpl_get_settings().reset_cache(only_expired=only_expired) for child in self._cfgimpl_children: child.cfgimpl_reset_cache(only_expired=only_expired, only=only) def set_contexts(self, path, value): for child in self._cfgimpl_children: try: if not isinstance(child, MetaConfig): setattr(child, path, value) else: child.set_contexts(path, value) except PropertiesOptionError: pass def find_first_contexts(self, byname=None, bypath=None, byvalue=None, type_='context'): ret = [] try: if bypath is None and byname is not None and \ self.cfgimpl_get_description() is not None: bypath = self._find(bytype=None, byvalue=None, byname=byname, first=True, type_='path', check_properties=False) except ConfigError: pass for child in self._cfgimpl_children: try: if not isinstance(child, MetaConfig): if bypath is not None: if byvalue is not None: if getattr(child, bypath) == byvalue: ret.append(child) else: #not raise getattr(child, bypath) ret.append(child) else: ret.append(child.find_first(byname=byname, byvalue=byvalue, type_=type_)) else: ret.extend(child.find_first_contexts(byname=byname, bypath=bypath, byvalue=byvalue, type_=type_)) except AttributeError: pass return ret def mandatory_warnings(config): """convenience function to trace Options that are mandatory and where no value has been set :returns: generator of mandatory Option's path """ #if value in cache, properties are not calculated config.cfgimpl_reset_cache(only=('values',)) for path in config.cfgimpl_get_description().optimpl_getpaths(include_groups=True): try: config._getattr(path, force_properties=('mandatory',)) except PropertiesOptionError, err: if err.proptype == ['mandatory']: yield path config.cfgimpl_reset_cache(only=('values',))