
559 lines
22 KiB

# -*- coding: utf-8 -*-
"pretty small and local configuration management tool"
# Copyright (C) 2012-2013 Team tiramisu (see AUTHORS for all contributors)
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# GNU General Public License for more details.
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
# The original `Config` design model is unproudly borrowed from
# the rough pypy's guys: http://codespeak.net/svn/pypy/dist/pypy/config/
# the whole pypy projet is under MIT licence
# ____________________________________________________________
from copy import copy
from inspect import getmembers, ismethod
from tiramisu.error import (PropertiesOptionError, ConfigError, NotFoundError,
AmbigousOptionError, ConflictConfigError, NoMatchingOptionFound,
MandatoryError, MethodCallError, NoValueReturned)
from tiramisu.option import (OptionDescription, Option, SymLinkOption,
from tiramisu.setting import groups, owners, Setting
from tiramisu.value import Values
# ____________________________________________________________
class Config(object):
"main configuration management entry"
_cfgimpl_toplevel = None
def __init__(self, descr, parent=None, context=None):
""" Configuration option management master class
:param descr: describes the configuration schema
:type descr: an instance of ``option.OptionDescription``
:param parent: is None if the ``Config`` is root parent Config otherwise
:type parent: ``Config``
:param context: the current root config
:type context: `Config`
# main option description
self._cfgimpl_descr = descr
# sub option descriptions
self._cfgimpl_subconfigs = {}
self._cfgimpl_parent = parent
if context is None:
self._cfgimpl_context = self
self._cfgimpl_context = context
if parent is None:
self._cfgimpl_settings = Setting()
self._cfgimpl_values = Values(self._cfgimpl_context)
if context is None:
raise ConfigError("cannot find a value for this config")
self._cfgimpl_settings = None
self._cfgimpl_values = None
"warnings are a great idea, let's make up a better use of it"
self._cfgimpl_warnings = []
self._cfgimpl_toplevel = self._cfgimpl_get_toplevel()
# some api members shall not be used as option's names !
methods = getmembers(self, ismethod)
self._cfgimpl_slots = [key for key, value in methods
if not key.startswith("_")]
def cfgimpl_get_settings(self):
return self._cfgimpl_context._cfgimpl_settings
def cfgimpl_set_settings(self, settings):
if not isinstance(settings, Setting):
raise ConfigError("setting not allowed")
self._cfgimpl_context._cfgimpl_settings = settings
def cfgimpl_get_description(self):
return self._cfgimpl_descr
def cfgimpl_get_value(self, path):
"""same as multiple getattrs
:param path: list or tuple of path
example : ``path = (sub_conf, opt)`` makes a getattr of sub_conf, then
a getattr of opt, and then returns the opt's value.
:returns: subconf or option's value
subpaths = list(path)
subconf_or_opt = self
for subpath in subpaths:
subconf_or_opt = getattr(subconf_or_opt, subpath)
return subconf_or_opt
def _validate_duplicates(self, children):
"""duplicates Option names in the schema
:type children: list of `Option` or `OptionDescription`
duplicates = []
for dup in children:
if dup._name not in duplicates:
raise ConflictConfigError('duplicate option name: '
def _cfgimpl_build(self):
- builds the config object from the schema
- settles various default values for options
if self._cfgimpl_descr.group_type == groups.master:
mastername = self._cfgimpl_descr._name
masteropt = getattr(self._cfgimpl_descr, mastername)
self._cfgimpl_context._cfgimpl_values.masters[masteropt] = []
for child in self._cfgimpl_descr._children:
if isinstance(child, OptionDescription):
self._cfgimpl_subconfigs[child] = Config(child, parent=self,
if child._name in self._cfgimpl_slots:
raise NameError("invalid name for the option:"
" {0}".format(child._name))
if (self._cfgimpl_descr.group_type == groups.master and
child != masteropt):
self._cfgimpl_context._cfgimpl_values.slaves[child] = masteropt
# ____________________________________________________________
# attribute methods
def __setattr__(self, name, value):
"attribute notation mechanism for the setting of the value of an option"
if name.startswith('_cfgimpl_'):
self.__dict__[name] = value
if '.' in name:
homeconfig, name = self._cfgimpl_get_home_by_path(name)
return setattr(homeconfig, name, value)
if type(getattr(self._cfgimpl_descr, name)) != SymLinkOption:
self._validate(name, getattr(self._cfgimpl_descr, name))
if name in self._cfgimpl_slots:
raise NameError("invalid name for the option:"
" {0}".format(name))
self.setoption(name, value)
def _validate(self, name, opt_or_descr, permissive=False):
"validation for the setattr and the getattr"
apply_requires(opt_or_descr, self, permissive=permissive)
if not isinstance(opt_or_descr, Option) and \
not isinstance(opt_or_descr, OptionDescription):
raise TypeError('Unexpected object: {0}'.format(repr(opt_or_descr)))
properties = copy(opt_or_descr.properties)
for proper in copy(properties):
if not self._cfgimpl_context._cfgimpl_settings.has_property(proper):
if permissive:
for perm in self._cfgimpl_context._cfgimpl_settings.permissive:
if perm in properties:
if properties != []:
raise PropertiesOptionError("trying to access"
" to an option named: {0} with properties"
" {1}".format(name, str(properties)),
def __getattr__(self, name):
return self._getattr(name)
# def fill_multi(self, opt, result, use_default_multi=False, default_multi=None):
# """fills a multi option with default and calculated values
# """
# if not isinstance(result, list):
# _result = [result]
# else:
# _result = result
# return Multi(_result, self._cfgimpl_context, opt)
def _getattr(self, name, permissive=False):
attribute notation mechanism for accessing the value of an option
:param name: attribute name
:param permissive: permissive doesn't raise some property error
(see ``permissive``)
:return: option's value if name is an option name, OptionDescription
# attribute access by passing a path,
# for instance getattr(self, "creole.general.family.adresse_ip_eth0")
if '.' in name:
homeconfig, name = self._cfgimpl_get_home_by_path(name)
return homeconfig._getattr(name, permissive)
opt_or_descr = getattr(self._cfgimpl_descr, name)
# symlink options
if type(opt_or_descr) == SymLinkOption:
rootconfig = self._cfgimpl_get_toplevel()
return getattr(rootconfig, opt_or_descr.path)
self._validate(name, opt_or_descr, permissive)
if isinstance(opt_or_descr, OptionDescription):
if opt_or_descr not in self._cfgimpl_subconfigs:
raise AttributeError("%s with name %s object has no attribute %s" %
(self.__class__, opt_or_descr._name, name))
return self._cfgimpl_subconfigs[opt_or_descr]
# special attributes
if name.startswith('_cfgimpl_'):
# if it were in __dict__ it would have been found already
return self.__dict__[name]
return self._cfgimpl_context._cfgimpl_values[opt_or_descr]
def unwrap_from_name(self, name):
"""convenience method to extract and Option() object from the Config()
**and it is slow**: it recursively searches into the namespaces
:returns: Option()
paths = self.getpaths(allpaths=True)
opts = dict([(path, self.unwrap_from_path(path)) for path in paths])
all_paths = [p.split(".") for p in self.getpaths()]
for pth in all_paths:
if name in pth:
return opts[".".join(pth)]
raise NotFoundError("name: {0} not found".format(name))
def unwrap_from_path(self, path):
"""convenience method to extract and Option() object from the Config()
and it is **fast**: finds the option directly in the appropriate
:returns: Option()
if '.' in path:
homeconfig, path = self._cfgimpl_get_home_by_path(path)
return getattr(homeconfig._cfgimpl_descr, path)
return getattr(self._cfgimpl_descr, path)
def setoption(self, name, value, who=None):
"""effectively modifies the value of an Option()
(typically called by the __setattr__)
child = getattr(self._cfgimpl_descr, name)
child.setoption(self, value)
def set(self, **kwargs):
do what I mean"-interface to option setting. Searches all paths
starting from that config for matches of the optional arguments
and sets the found option if the match is not ambiguous.
:param kwargs: dict of name strings to values.
all_paths = [p.split(".") for p in self.getpaths(allpaths=True)]
for key, value in kwargs.iteritems():
key_p = key.split('.')
candidates = [p for p in all_paths if p[-len(key_p):] == key_p]
if len(candidates) == 1:
name = '.'.join(candidates[0])
homeconfig, name = self._cfgimpl_get_home_by_path(name)
getattr(homeconfig, name)
except MandatoryError:
except Exception, e:
raise e # HiddenOptionError or DisabledOptionError
homeconfig.setoption(name, value)
elif len(candidates) > 1:
raise AmbigousOptionError(
'more than one option that ends with %s' % (key, ))
raise NoMatchingOptionFound(
'there is no option that matches %s'
' or the option is hidden or disabled' % (key, ))
def get(self, name):
same as a `find_first()` method in a config that has identical names:
it returns the first item of an option named `name`
much like the attribute access way, except that
the search for the option is performed recursively in the whole
configuration tree.
**carefull**: very slow !
:returns: option value.
paths = self.getpaths(allpaths=True)
pathsvalues = []
for path in paths:
pathname = path.split('.')[-1]
if pathname == name:
value = getattr(self, path)
return value
except Exception, e:
raise e
raise NotFoundError("option {0} not found in config".format(name))
def _cfgimpl_get_home_by_path(self, path):
""":returns: tuple (config, name)"""
path = path.split('.')
for step in path[:-1]:
self = getattr(self, step)
return self, path[-1]
def _cfgimpl_get_toplevel(self):
":returns: root config"
while self._cfgimpl_parent is not None:
self = self._cfgimpl_parent
return self
def _cfgimpl_get_path(self):
"the path in the attribute access meaning."
subpath = []
obj = self
while obj._cfgimpl_parent is not None:
subpath.insert(0, obj._cfgimpl_descr._name)
obj = obj._cfgimpl_parent
return ".".join(subpath)
# ______________________________________________________________________
def add_warning(self, warning):
"Config implements its own warning pile. Could be useful"
def get_warnings(self):
"Config implements its own warning pile"
return self._cfgimpl_get_toplevel()._cfgimpl_warnings
# ____________________________________________________________
def getkey(self):
return self._cfgimpl_descr.getkey(self)
def __hash__(self):
return hash(self.getkey())
def __eq__(self, other):
"Config comparison"
if not isinstance(other, Config):
return False
return self.getkey() == other.getkey()
def __ne__(self, other):
"Config comparison"
return not self == other
# ______________________________________________________________________
def __iter__(self):
"""Pythonesque way of parsing group's ordered options.
iteration only on Options (not OptionDescriptions)"""
for child in self._cfgimpl_descr._children:
if not isinstance(child, OptionDescription):
yield child._name, getattr(self, child._name)
pass # option with properties
def iter_groups(self, group_type=None):
"""iteration on groups objects only.
All groups are returned if `group_type` is `None`, otherwise the groups
can be filtered by categories (families, or whatever).
:param group_type: if defined, is an instance of `groups.GroupType`
or `groups.MasterGroupType` that lives in
if group_type is not None:
if not isinstance(group_type, groups.GroupType):
raise TypeError("Unknown group_type: {0}".format(group_type))
for child in self._cfgimpl_descr._children:
if isinstance(child, OptionDescription):
if group_type is not None:
if child.get_group_type() == group_type:
yield child._name, getattr(self, child._name)
yield child._name, getattr(self, child._name)
# ______________________________________________________________________
def __str__(self):
"Config's string representation"
lines = []
for name, grp in self.iter_groups():
lines.append("[%s]" % name)
for name, value in self:
lines.append("%s = %s" % (name, value))
return '\n'.join(lines)
__repr__ = __str__
def getpaths(self, include_groups=False, allpaths=False, mandatory=False):
"""returns a list of all paths in self, recursively, taking care of
the context of properties (hidden/disabled)
:param include_groups: if true, OptionDescription are included
:param allpaths: all the options (event the properties protected ones)
:param mandatory: includes the mandatory options
:returns: list of all paths
paths = []
for path in self._cfgimpl_descr.getpaths(include_groups=include_groups):
value = getattr(self, path)
except MandatoryError:
if mandatory or allpaths:
except PropertiesOptionError:
if allpaths:
paths.append(path) # option which have properties added
return paths
def _find(self, bytype, byname, byvalue, byattrs, first):
convenience method for finding an option that lives only in the subtree
:param first: return only one option if True, a list otherwise
:return: find list or an exception if nothing has been found
def _filter_by_attrs():
if byattrs is None:
return True
for key, value in byattrs.items():
if not hasattr(option, key):
return False
if getattr(option, key) != value:
return False
return True
def _filter_by_name():
if byname is None:
return True
pathname = path.split('.')[-1]
if pathname == byname:
return True
return False
def _filter_by_value():
if byvalue is None:
return True
value = getattr(self, path)
if value == byvalue:
return True
except: # a property restricts the access of the value
return False
def _filter_by_type():
if bytype is None:
return True
if isinstance(option, bytype):
return True
return False
find_results = []
paths = self.getpaths(allpaths=True)
for path in paths:
option = self.unwrap_from_path(path)
except PropertiesOptionError, err:
if not _filter_by_name():
if not _filter_by_value():
if not _filter_by_type():
if not _filter_by_attrs():
if first:
return option
if find_results == []:
raise NotFoundError("no option found in config with these criteria")
return find_results
def find(self, bytype=None, byname=None, byvalue=None, byattrs=None):
finds a list of options recursively in the config
:param bytype: Option class (BoolOption, StrOption, ...)
:param byname: filter by Option._name
:param byvalue: filter by the option's value
:param byattrs: dict of option attributes (default, callback...)
:returns: list of matching Option objects
return self._find(bytype, byname, byvalue, byattrs, first=False)
def find_first(self, bytype=None, byname=None, byvalue=None, byattrs=None):
finds an option recursively in the config
:param bytype: Option class (BoolOption, StrOption, ...)
:param byname: filter by Option._name
:param byvalue: filter by the option's value
:param byattrs: dict of option attributes (default, callback...)
:returns: list of matching Option objects
return self._find(bytype, byname, byvalue, byattrs, first=True)
def make_dict(config, flatten=False):
"""export the whole config into a `dict`
:returns: dict of Option's name (or path) and values"""
paths = config.getpaths()
pathsvalues = []
for path in paths:
if flatten:
pathname = path.split('.')[-1]
pathname = path
value = getattr(config, path)
pathsvalues.append((pathname, value))
pass # this just a hidden or disabled option
options = dict(pathsvalues)
return options
def mandatory_warnings(config):
"""convenience function to trace Options that are mandatory and
where no value has been set
:returns: generator of mandatory Option's path
FIXME : CAREFULL : not multi-user
mandatory = config._cfgimpl_context._cfgimpl_settings.mandatory
config._cfgimpl_context._cfgimpl_settings.mandatory = True
for path in config._cfgimpl_descr.getpaths(include_groups=True):
value = config._getattr(path, permissive=True)
except MandatoryError:
yield path
except PropertiesOptionError:
config._cfgimpl_context._cfgimpl_settings.mandatory = mandatory