517 lines
21 KiB
Python
517 lines
21 KiB
Python
# -*- coding: utf-8 -*-
|
|
"pretty small and local configuration management tool"
|
|
# Copyright (C) 2012-2013 Team tiramisu (see AUTHORS for all contributors)
|
|
#
|
|
# This program is free software; you can redistribute it and/or modify
|
|
# it under the terms of the GNU General Public License as published by
|
|
# the Free Software Foundation; either version 2 of the License, or
|
|
# (at your option) any later version.
|
|
#
|
|
# This program is distributed in the hope that it will be useful,
|
|
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
# GNU General Public License for more details.
|
|
#
|
|
# You should have received a copy of the GNU General Public License
|
|
# along with this program; if not, write to the Free Software
|
|
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
|
|
#
|
|
# The original `Config` design model is unproudly borrowed from
|
|
# the rough pypy's guys: http://codespeak.net/svn/pypy/dist/pypy/config/
|
|
# the whole pypy projet is under MIT licence
|
|
# ____________________________________________________________
|
|
#from inspect import getmembers, ismethod
|
|
from tiramisu.error import (PropertiesOptionError, NotFoundError,
|
|
AmbigousOptionError, NoMatchingOptionFound, MandatoryError)
|
|
from tiramisu.option import (OptionDescription, Option, SymLinkOption,
|
|
apply_requires)
|
|
from tiramisu.setting import groups, Setting
|
|
from tiramisu.value import Values
|
|
|
|
|
|
class SubConfig(object):
|
|
"sub configuration management entry"
|
|
__slots__ = ('_cfgimpl_descr', '_cfgimpl_context')
|
|
|
|
def __init__(self, descr, context):
|
|
""" Configuration option management master class
|
|
|
|
:param descr: describes the configuration schema
|
|
:type descr: an instance of ``option.OptionDescription``
|
|
:param context: the current root config
|
|
:type context: `Config`
|
|
"""
|
|
# main option description
|
|
self._cfgimpl_descr = descr
|
|
# sub option descriptions
|
|
self._cfgimpl_context = context
|
|
|
|
def cfgimpl_get_context(self):
|
|
return self._cfgimpl_context
|
|
|
|
def cfgimpl_get_settings(self):
|
|
return self._cfgimpl_context._cfgimpl_settings
|
|
|
|
def cfgimpl_get_values(self):
|
|
return self._cfgimpl_context._cfgimpl_values
|
|
|
|
def cfgimpl_get_description(self):
|
|
return self._cfgimpl_descr
|
|
|
|
# ____________________________________________________________
|
|
# attribute methods
|
|
def __setattr__(self, name, value):
|
|
"attribute notation mechanism for the setting of the value of an option"
|
|
if name.startswith('_cfgimpl_'):
|
|
#self.__dict__[name] = value
|
|
object.__setattr__(self, name, value)
|
|
return
|
|
self._setattr(name, value)
|
|
|
|
def _setattr(self, name, value, force_permissive=False):
|
|
if '.' in name:
|
|
homeconfig, name = self.cfgimpl_get_home_by_path(name)
|
|
return homeconfig.__setattr__(name, value)
|
|
if type(getattr(self._cfgimpl_descr, name)) != SymLinkOption:
|
|
self._validate(name, getattr(self._cfgimpl_descr, name), force_permissive=force_permissive)
|
|
self.setoption(name, value)
|
|
|
|
def _validate(self, name, opt_or_descr, force_permissive=False):
|
|
"validation for the setattr and the getattr"
|
|
apply_requires(opt_or_descr, self)
|
|
if not isinstance(opt_or_descr, Option) and \
|
|
not isinstance(opt_or_descr, OptionDescription):
|
|
raise TypeError('Unexpected object: {0}'.format(repr(opt_or_descr)))
|
|
properties = set(self.cfgimpl_get_settings().get_properties(opt_or_descr))
|
|
#remove this properties, those properties are validate in value/setting
|
|
properties = properties - set(['mandatory', 'frozen'])
|
|
set_properties = set(self.cfgimpl_get_settings().get_properties())
|
|
properties = properties & set_properties
|
|
if force_permissive is True or self.cfgimpl_get_settings().has_property('permissive'):
|
|
properties = properties - set(self.cfgimpl_get_settings().get_permissive())
|
|
properties = properties - set(self.cfgimpl_get_settings().get_permissive(opt_or_descr))
|
|
properties = list(properties)
|
|
if properties != []:
|
|
raise PropertiesOptionError("trying to access"
|
|
" to an option named: {0} with properties"
|
|
" {1}".format(name, str(properties)),
|
|
properties)
|
|
|
|
def __getattr__(self, name):
|
|
return self._getattr(name)
|
|
|
|
def _getattr(self, name, force_permissive=False, force_properties=None):
|
|
"""
|
|
attribute notation mechanism for accessing the value of an option
|
|
:param name: attribute name
|
|
:return: option's value if name is an option name, OptionDescription
|
|
otherwise
|
|
"""
|
|
# attribute access by passing a path,
|
|
# for instance getattr(self, "creole.general.family.adresse_ip_eth0")
|
|
if '.' in name:
|
|
homeconfig, name = self.cfgimpl_get_home_by_path(name,
|
|
force_permissive=force_permissive,
|
|
force_properties=force_properties)
|
|
return homeconfig._getattr(name, force_permissive=force_permissive,
|
|
force_properties=force_properties)
|
|
opt_or_descr = getattr(self._cfgimpl_descr, name)
|
|
# symlink options
|
|
if type(opt_or_descr) == SymLinkOption:
|
|
rootconfig = self.cfgimpl_get_context()
|
|
path = rootconfig.cfgimpl_get_description().get_path_by_opt(opt_or_descr.opt)
|
|
return getattr(rootconfig, path)
|
|
self._validate(name, opt_or_descr, force_permissive=force_permissive)
|
|
if isinstance(opt_or_descr, OptionDescription):
|
|
children = self.cfgimpl_get_description()._children
|
|
if opt_or_descr not in children[1]:
|
|
raise AttributeError("{0} with name {1} object has "
|
|
"no attribute {2}".format(self.__class__,
|
|
opt_or_descr._name,
|
|
name))
|
|
return SubConfig(opt_or_descr, self._cfgimpl_context)
|
|
# special attributes
|
|
if name.startswith('_cfgimpl_'):
|
|
# if it were in __dict__ it would have been found already
|
|
object.__getattr__(self, name)
|
|
return self.cfgimpl_get_values()._getitem(opt_or_descr,
|
|
force_properties=force_properties)
|
|
|
|
def setoption(self, name, value, who=None):
|
|
"""effectively modifies the value of an Option()
|
|
(typically called by the __setattr__)
|
|
"""
|
|
child = getattr(self._cfgimpl_descr, name)
|
|
child.setoption(self, value)
|
|
|
|
def cfgimpl_get_home_by_path(self, path, force_permissive=False, force_properties=None):
|
|
""":returns: tuple (config, name)"""
|
|
path = path.split('.')
|
|
for step in path[:-1]:
|
|
self = self._getattr(step,
|
|
force_permissive=force_permissive,
|
|
force_properties=force_properties)
|
|
return self, path[-1]
|
|
|
|
def getkey(self):
|
|
return self._cfgimpl_descr.getkey(self)
|
|
|
|
def __hash__(self):
|
|
return hash(self.getkey())
|
|
|
|
def __eq__(self, other):
|
|
"Config comparison"
|
|
if not isinstance(other, Config):
|
|
return False
|
|
return self.getkey() == other.getkey()
|
|
|
|
def __ne__(self, other):
|
|
"Config comparison"
|
|
return not self == other
|
|
|
|
# ______________________________________________________________________
|
|
def __iter__(self):
|
|
"""Pythonesque way of parsing group's ordered options.
|
|
iteration only on Options (not OptionDescriptions)"""
|
|
for child in self._cfgimpl_descr._children[1]:
|
|
if not isinstance(child, OptionDescription):
|
|
try:
|
|
yield child._name, getattr(self, child._name)
|
|
except GeneratorExit:
|
|
raise StopIteration
|
|
except:
|
|
pass # option with properties
|
|
|
|
def iter_all(self):
|
|
"""A way of parsing options **and** groups.
|
|
iteration on Options and OptionDescriptions."""
|
|
for child in self._cfgimpl_descr._children[1]:
|
|
try:
|
|
yield child._name, getattr(self, child._name)
|
|
except GeneratorExit:
|
|
raise StopIteration
|
|
except:
|
|
pass # option with properties
|
|
|
|
def iter_groups(self, group_type=None):
|
|
"""iteration on groups objects only.
|
|
All groups are returned if `group_type` is `None`, otherwise the groups
|
|
can be filtered by categories (families, or whatever).
|
|
|
|
:param group_type: if defined, is an instance of `groups.GroupType`
|
|
or `groups.MasterGroupType` that lives in
|
|
`setting.groups`
|
|
|
|
"""
|
|
if group_type is not None:
|
|
if not isinstance(group_type, groups.GroupType):
|
|
raise TypeError("Unknown group_type: {0}".format(group_type))
|
|
for child in self._cfgimpl_descr._children[1]:
|
|
if isinstance(child, OptionDescription):
|
|
try:
|
|
if group_type is not None:
|
|
if child.get_group_type() == group_type:
|
|
yield child._name, getattr(self, child._name)
|
|
else:
|
|
yield child._name, getattr(self, child._name)
|
|
except GeneratorExit:
|
|
raise StopIteration
|
|
except:
|
|
pass
|
|
# ______________________________________________________________________
|
|
|
|
def __str__(self):
|
|
"Config's string representation"
|
|
lines = []
|
|
for name, grp in self.iter_groups():
|
|
lines.append("[%s]" % name)
|
|
for name, value in self:
|
|
try:
|
|
lines.append("%s = %s" % (name, value))
|
|
except:
|
|
pass
|
|
return '\n'.join(lines)
|
|
|
|
__repr__ = __str__
|
|
|
|
def getpaths(self, include_groups=False, allpaths=False, mandatory=False):
|
|
"""returns a list of all paths in self, recursively, taking care of
|
|
the context of properties (hidden/disabled)
|
|
|
|
:param include_groups: if true, OptionDescription are included
|
|
:param allpaths: all the options (event the properties protected ones)
|
|
:param mandatory: includes the mandatory options
|
|
:returns: list of all paths
|
|
"""
|
|
paths = []
|
|
for path in self._cfgimpl_descr.getpaths(include_groups=include_groups):
|
|
if allpaths:
|
|
paths.append(path)
|
|
else:
|
|
try:
|
|
getattr(self, path)
|
|
except MandatoryError:
|
|
if mandatory:
|
|
paths.append(path)
|
|
except PropertiesOptionError:
|
|
pass
|
|
else:
|
|
paths.append(path)
|
|
return paths
|
|
|
|
def getpath(self):
|
|
descr = self.cfgimpl_get_description()
|
|
context_descr = self.cfgimpl_get_context().cfgimpl_get_description()
|
|
return context_descr.get_path_by_opt(descr)
|
|
|
|
def find(self, bytype=None, byname=None, byvalue=None, byattrs=None,
|
|
type_='option'):
|
|
"""
|
|
finds a list of options recursively in the config
|
|
|
|
:param bytype: Option class (BoolOption, StrOption, ...)
|
|
:param byname: filter by Option._name
|
|
:param byvalue: filter by the option's value
|
|
:param byattrs: dict of option attributes (default, callback...)
|
|
:returns: list of matching Option objects
|
|
"""
|
|
return self.cfgimpl_get_context()._find(bytype, byname, byvalue,
|
|
byattrs, first=False,
|
|
type_=type_,
|
|
_subpath=self.getpath())
|
|
|
|
def find_first(self, bytype=None, byname=None, byvalue=None, byattrs=None,
|
|
type_='option'):
|
|
"""
|
|
finds an option recursively in the config
|
|
|
|
:param bytype: Option class (BoolOption, StrOption, ...)
|
|
:param byname: filter by Option._name
|
|
:param byvalue: filter by the option's value
|
|
:param byattrs: dict of option attributes (default, callback...)
|
|
:returns: list of matching Option objects
|
|
"""
|
|
return self.cfgimpl_get_context()._find(bytype, byname, byvalue,
|
|
byattrs, first=True,
|
|
type_=type_,
|
|
_subpath=self.getpath())
|
|
|
|
def make_dict(self, flatten=False, _currpath=None, withoption=None, withvalue=None):
|
|
"""export the whole config into a `dict`
|
|
:returns: dict of Option's name (or path) and values"""
|
|
pathsvalues = []
|
|
if _currpath is None:
|
|
_currpath = []
|
|
if withoption is None and withvalue is not None:
|
|
raise ValueError("make_dict can't filtering with value without option")
|
|
if withoption is not None:
|
|
mypath = self.getpath()
|
|
for path in self.cfgimpl_get_context()._find(bytype=Option, byname=withoption,
|
|
byvalue=withvalue, byattrs=None,
|
|
first=False, ret='path', _subpath=mypath):
|
|
path = '.'.join(path.split('.')[:-1])
|
|
opt = self.cfgimpl_get_context().cfgimpl_get_description().get_opt_by_path(path)
|
|
if mypath is not None:
|
|
if mypath == path:
|
|
withoption = None
|
|
withvalue = None
|
|
break
|
|
else:
|
|
tmypath = mypath + '.'
|
|
if not path.startswith(tmypath):
|
|
raise Exception('unexpected path {}, '
|
|
'should start with {}'.format(path, mypath))
|
|
path = path[len(tmypath):]
|
|
self._make_sub_dict(opt, path, pathsvalues, _currpath, flatten)
|
|
#withoption can be set to None below !
|
|
if withoption is None:
|
|
for opt in self.cfgimpl_get_description().getchildren():
|
|
path = opt._name
|
|
self._make_sub_dict(opt, path, pathsvalues, _currpath, flatten)
|
|
if _currpath == []:
|
|
options = dict(pathsvalues)
|
|
return options
|
|
return pathsvalues
|
|
|
|
def _make_sub_dict(self, opt, path, pathsvalues, _currpath, flatten):
|
|
if isinstance(opt, OptionDescription):
|
|
pathsvalues += getattr(self, path).make_dict(flatten,
|
|
_currpath + path.split('.'))
|
|
else:
|
|
try:
|
|
value = self._getattr(opt._name)
|
|
if flatten:
|
|
name = opt._name
|
|
else:
|
|
name = '.'.join(_currpath + [opt._name])
|
|
pathsvalues.append((name, value))
|
|
except PropertiesOptionError:
|
|
pass # this just a hidden or disabled option
|
|
|
|
|
|
# ____________________________________________________________
|
|
class Config(SubConfig):
|
|
"main configuration management entry"
|
|
__slots__ = ('_cfgimpl_settings', '_cfgimpl_values')
|
|
|
|
def __init__(self, descr):
|
|
""" Configuration option management master class
|
|
|
|
:param descr: describes the configuration schema
|
|
:type descr: an instance of ``option.OptionDescription``
|
|
:param context: the current root config
|
|
:type context: `Config`
|
|
"""
|
|
self._cfgimpl_settings = Setting()
|
|
self._cfgimpl_values = Values(self)
|
|
super(Config, self).__init__(descr, self) # , slots)
|
|
self._cfgimpl_build_all_paths()
|
|
|
|
def _cfgimpl_build_all_paths(self):
|
|
self._cfgimpl_descr.build_cache()
|
|
|
|
def unwrap_from_path(self, path):
|
|
"""convenience method to extract and Option() object from the Config()
|
|
and it is **fast**: finds the option directly in the appropriate
|
|
namespace
|
|
|
|
:returns: Option()
|
|
"""
|
|
if '.' in path:
|
|
homeconfig, path = self.cfgimpl_get_home_by_path(path)
|
|
return getattr(homeconfig._cfgimpl_descr, path)
|
|
return getattr(self._cfgimpl_descr, path)
|
|
|
|
def set(self, **kwargs):
|
|
"""
|
|
do what I mean"-interface to option setting. Searches all paths
|
|
starting from that config for matches of the optional arguments
|
|
and sets the found option if the match is not ambiguous.
|
|
|
|
:param kwargs: dict of name strings to values.
|
|
"""
|
|
all_paths = [p.split(".") for p in self.getpaths(allpaths=True)]
|
|
for key, value in kwargs.iteritems():
|
|
key_p = key.split('.')
|
|
candidates = [p for p in all_paths if p[-len(key_p):] == key_p]
|
|
if len(candidates) == 1:
|
|
name = '.'.join(candidates[0])
|
|
homeconfig, name = self.cfgimpl_get_home_by_path(name)
|
|
try:
|
|
getattr(homeconfig, name)
|
|
except MandatoryError:
|
|
pass
|
|
except Exception, e:
|
|
raise e # HiddenOptionError or DisabledOptionError
|
|
homeconfig.setoption(name, value)
|
|
elif len(candidates) > 1:
|
|
raise AmbigousOptionError(
|
|
'more than one option that ends with %s' % (key, ))
|
|
else:
|
|
raise NoMatchingOptionFound(
|
|
'there is no option that matches %s'
|
|
' or the option is hidden or disabled' % (key, ))
|
|
|
|
def getpath(self):
|
|
return None
|
|
|
|
def _find(self, bytype, byname, byvalue, byattrs, first, type_='option',
|
|
_subpath=None):
|
|
"""
|
|
convenience method for finding an option that lives only in the subtree
|
|
|
|
:param first: return only one option if True, a list otherwise
|
|
:return: find list or an exception if nothing has been found
|
|
"""
|
|
def _filter_by_name():
|
|
if byname is None:
|
|
return True
|
|
if path == byname or path.endswith('.' + byname):
|
|
return True
|
|
else:
|
|
return False
|
|
|
|
def _filter_by_value():
|
|
if byvalue is None:
|
|
return True
|
|
try:
|
|
value = getattr(self, path)
|
|
if value == byvalue:
|
|
return True
|
|
except: # a property restricts the access of the value
|
|
pass
|
|
return False
|
|
|
|
def _filter_by_type():
|
|
if bytype is None:
|
|
return True
|
|
if isinstance(option, bytype):
|
|
return True
|
|
return False
|
|
|
|
def _filter_by_attrs():
|
|
if byattrs is None:
|
|
return True
|
|
for key, value in byattrs.items():
|
|
if not hasattr(option, key):
|
|
return False
|
|
else:
|
|
if getattr(option, key) != value:
|
|
return False
|
|
else:
|
|
continue
|
|
return True
|
|
if type_ not in ('option', 'path', 'value'):
|
|
raise ValueError('unknown type_ type {} for _find'.format(type_))
|
|
find_results = []
|
|
opts, paths = self.cfgimpl_get_description()._cache_paths
|
|
for index in range(0, len(paths)):
|
|
option = opts[index]
|
|
if isinstance(option, OptionDescription):
|
|
continue
|
|
path = paths[index]
|
|
if _subpath is not None and not path.startswith(_subpath + '.'):
|
|
continue
|
|
if not _filter_by_name():
|
|
continue
|
|
if not _filter_by_value():
|
|
continue
|
|
if not _filter_by_type():
|
|
continue
|
|
if not _filter_by_attrs():
|
|
continue
|
|
#remove option with propertyerror, ...
|
|
try:
|
|
value = getattr(self, path)
|
|
except: # a property restricts the access of the value
|
|
continue
|
|
if type_ == 'value':
|
|
retval = value
|
|
elif type_ == 'path':
|
|
retval = path
|
|
else:
|
|
retval = option
|
|
if first:
|
|
return retval
|
|
else:
|
|
find_results.append(retval)
|
|
if find_results == []:
|
|
raise NotFoundError("no option found in config with these criteria")
|
|
else:
|
|
return find_results
|
|
|
|
|
|
def mandatory_warnings(config):
|
|
"""convenience function to trace Options that are mandatory and
|
|
where no value has been set
|
|
|
|
:returns: generator of mandatory Option's path
|
|
"""
|
|
for path in config.cfgimpl_get_description().getpaths(include_groups=True):
|
|
try:
|
|
config._getattr(path, force_properties=('mandatory',))
|
|
except MandatoryError:
|
|
yield path
|
|
except PropertiesOptionError:
|
|
pass
|