tiramisu/tiramisu/config.py

631 lines
26 KiB
Python
Raw Blame History

This file contains invisible Unicode characters

This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# -*- coding: utf-8 -*-
"pretty small and local configuration management tool"
# Copyright (C) 2012 Team tiramisu (see AUTHORS for all contributors)
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
#
# The original `Config` design model is unproudly borrowed from
# the rough pypy's guys: http://codespeak.net/svn/pypy/dist/pypy/config/
# the whole pypy projet is under MIT licence
# ____________________________________________________________
from copy import copy
from tiramisu.error import (PropertiesOptionError, ConfigError, NotFoundError,
AmbigousOptionError, ConflictConfigError, NoMatchingOptionFound,
MandatoryError, MethodCallError, NoValueReturned)
from tiramisu.option import (OptionDescription, Option, SymLinkOption,
Multi, apply_requires)
from tiramisu.setting import groups, owners, Setting
from tiramisu.value import OptionValues
# ____________________________________________________________
class Config(object):
"main configuration management entry"
_cfgimpl_toplevel = None
def __init__(self, descr, parent=None, context=None):
""" Configuration option management master class
:param descr: describes the configuration schema
:type descr: an instance of ``option.OptionDescription``
:param parent: is None if the ``Config`` is root parent Config otherwise
:type parent: ``Config``
:param context: the current root config
:type context: `Config`
"""
self._cfgimpl_descr = descr
self._cfgimpl_parent = parent
if parent == None:
self._cfgimpl_settings = Setting()
self._cfgimpl_values = OptionValues()
else:
if context is None:
raise ConfigError("cannot find a value for this config")
self._cfgimpl_settings = None
self._cfgimpl_values = None
if context is None:
self._cfgimpl_context = self
else:
self._cfgimpl_context = context
"warnings are a great idea, let's make up a better use of it"
self._cfgimpl_warnings = []
self._cfgimpl_toplevel = self._cfgimpl_get_toplevel()
self._cfgimpl_build()
def cfgimpl_get_settings(self):
return self._cfgimpl_context._cfgimpl_settings
def cfgimpl_set_settings(self, settings):
if not isinstance(settings, Setting):
raise ConfigError("setting not allowed")
self._cfgimpl_context._cfgimpl_settings = settings
def _validate_duplicates(self, children):
"""duplicates Option names in the schema
:type children: list of `Option` or `OptionDescription`
"""
duplicates = []
for dup in children:
if dup._name not in duplicates:
duplicates.append(dup._name)
else:
raise ConflictConfigError('duplicate option name: '
'{0}'.format(dup._name))
def _cfgimpl_build(self):
"""
- builds the config object from the schema
- settles various default values for options
"""
self._validate_duplicates(self._cfgimpl_descr._children)
#max len for a master/slave group
max_len_child = 0
for child in self._cfgimpl_descr._children:
if isinstance(child, Option):
if child.is_multi():
childdef = Multi(copy(child.getdefault()), config=self,
opt=child)
max_len_child = max(max_len_child, len(childdef))
self._cfgimpl_context._cfgimpl_values[child] = childdef
self._cfgimpl_context._cfgimpl_values.previous_values[child] = list(childdef)
else:
childdef = child.getdefault()
self._cfgimpl_context._cfgimpl_values[child] = childdef
self._cfgimpl_context._cfgimpl_values.previous_values[child] = childdef
child.setowner(self, owners.default)
elif isinstance(child, OptionDescription):
self._validate_duplicates(child._children)
self._cfgimpl_context._cfgimpl_values[child] = Config(child, parent=self,
context=self._cfgimpl_context)
# def cfgimpl_update(self):
# """dynamically adds `Option()` or `OptionDescription()`
# """
# # FIXME this is an update for new options in the schema only
# # see the update_child() method of the descr object
# for child in self._cfgimpl_descr._children:
# if isinstance(child, Option):
# if child._name not in self._cfgimpl_values:
# if child.is_multi():
# self._cfgimpl_values[child._name] = Multi(
# copy(child.getdefault()), config=self, opt=child)
# else:
# self._cfgimpl_values[child._name] = copy(child.getdefault())
# child.setowner(self, owners.default)
# elif isinstance(child, OptionDescription):
# if child._name not in self._cfgimpl_values:
# self._cfgimpl_values[child._name] = Config(child, parent=self)
# ____________________________________________________________
# attribute methods
def __setattr__(self, name, value):
"attribute notation mechanism for the setting of the value of an option"
if name.startswith('_cfgimpl_'):
self.__dict__[name] = value
return
if '.' in name:
homeconfig, name = self._cfgimpl_get_home_by_path(name)
return setattr(homeconfig, name, value)
if type(getattr(self._cfgimpl_descr, name)) != SymLinkOption:
self._validate(name, getattr(self._cfgimpl_descr, name))
self.setoption(name, value,
self._cfgimpl_context._cfgimpl_settings.get_owner())
def _validate(self, name, opt_or_descr, permissive=False):
"validation for the setattr and the getattr"
apply_requires(opt_or_descr, self, permissive=permissive)
if not isinstance(opt_or_descr, Option) and \
not isinstance(opt_or_descr, OptionDescription):
raise TypeError('Unexpected object: {0}'.format(repr(opt_or_descr)))
properties = copy(opt_or_descr.properties)
for proper in copy(properties):
if not self._cfgimpl_context._cfgimpl_settings.has_property(proper):
properties.remove(proper)
if permissive:
for perm in self._cfgimpl_context._cfgimpl_settings.permissive:
if perm in properties:
properties.remove(perm)
if properties != []:
raise PropertiesOptionError("trying to access"
" to an option named: {0} with properties"
" {1}".format(name, str(properties)),
properties)
def _is_empty(self, opt):
"convenience method to know if an option is empty"
if (not opt.is_multi() and self._cfgimpl_context._cfgimpl_values[opt] == None) or \
(opt.is_multi() and (self._cfgimpl_context._cfgimpl_values[opt] == [] or \
None in self._cfgimpl_context._cfgimpl_values[opt])):
return True
return False
def _test_mandatory(self, path, opt):
# mandatory options
mandatory = self._cfgimpl_context._cfgimpl_settings.mandatory
if opt.is_mandatory() and mandatory:
if self._is_empty(opt) and \
opt.is_empty_by_default():
raise MandatoryError("option: {0} is mandatory "
"and shall have a value".format(path))
def __getattr__(self, name):
return self._getattr(name)
def fill_multi(self, opt, result, use_default_multi=False, default_multi=None):
"""fills a multi option with default and calculated values
"""
# FIXME C'EST ENCORE DU N'IMPORTE QUOI
value = self._cfgimpl_context._cfgimpl_values[opt]
if not isinstance(result, list):
_result = [result]
else:
_result = result
return Multi(_result, value.config, opt=value.opt)
def _getattr(self, name, permissive=False):
"""
attribute notation mechanism for accessing the value of an option
:param name: attribute name
:param permissive: permissive doesn't raise some property error
(see ``permissive``)
:return: option's value if name is an option name, OptionDescription
otherwise
"""
# attribute access by passing a path,
# for instance getattr(self, "creole.general.family.adresse_ip_eth0")
if '.' in name:
homeconfig, name = self._cfgimpl_get_home_by_path(name)
return homeconfig._getattr(name, permissive)
opt_or_descr = getattr(self._cfgimpl_descr, name)
# symlink options
if type(opt_or_descr) == SymLinkOption:
rootconfig = self._cfgimpl_get_toplevel()
return getattr(rootconfig, opt_or_descr.path)
if opt_or_descr not in self._cfgimpl_context._cfgimpl_values:
raise AttributeError("%s object has no attribute %s" %
(self.__class__, name))
self._validate(name, opt_or_descr, permissive)
# special attributes
if name.startswith('_cfgimpl_'):
# if it were in __dict__ it would have been found already
return self.__dict__[name]
raise AttributeError("%s object has no attribute %s" %
(self.__class__, name))
if not isinstance(opt_or_descr, OptionDescription):
# options with callbacks
if opt_or_descr.has_callback():
value = self._cfgimpl_context._cfgimpl_values[opt_or_descr]
if (not opt_or_descr.is_frozen() or \
not opt_or_descr.is_forced_on_freeze()) and \
not opt_or_descr.is_default_owner(self):
return value
try:
result = opt_or_descr.getcallback_value(
self._cfgimpl_get_toplevel())
except NoValueReturned, err:
pass
else:
if opt_or_descr.is_multi():
_result = self.fill_multi(opt_or_descr, result)
else:
# this result **shall not** be a list
if isinstance(result, list):
raise ConfigError('invalid calculated value returned'
' for option {0} : shall not be a list'.format(name))
_result = result
if _result != None and not opt_or_descr.validate(_result,
self._cfgimpl_context._cfgimpl_settings.validator):
raise ConfigError('invalid calculated value returned'
' for option {0}'.format(name))
self._cfgimpl_context._cfgimpl_values[opt_or_descr] = _result
opt_or_descr.setowner(self, owners.default)
# frozen and force default
if not opt_or_descr.has_callback() and opt_or_descr.is_forced_on_freeze():
value = opt_or_descr.getdefault()
if opt_or_descr.is_multi():
value = self.fill_multi(opt_or_descr, value,
use_default_multi=True,
default_multi=opt_or_descr.getdefault_multi())
self._cfgimpl_context._cfgimpl_values[opt_or_descr] = value
opt_or_descr.setowner(self, owners.default)
self._test_mandatory(name, opt_or_descr)
value = self._cfgimpl_context._cfgimpl_values[opt_or_descr]
return value
def unwrap_from_name(self, name):
"""convenience method to extract and Option() object from the Config()
**and it is slow**: it recursively searches into the namespaces
:returns: Option()
"""
paths = self.getpaths(allpaths=True)
opts = dict([(path, self.unwrap_from_path(path)) for path in paths])
all_paths = [p.split(".") for p in self.getpaths()]
for pth in all_paths:
if name in pth:
return opts[".".join(pth)]
raise NotFoundError("name: {0} not found".format(name))
def unwrap_from_path(self, path):
"""convenience method to extract and Option() object from the Config()
and it is **fast**: finds the option directly in the appropriate
namespace
:returns: Option()
"""
if '.' in path:
homeconfig, path = self._cfgimpl_get_home_by_path(path)
return getattr(homeconfig._cfgimpl_descr, path)
return getattr(self._cfgimpl_descr, path)
def setoption(self, name, value, who=None):
"""effectively modifies the value of an Option()
(typically called by the __setattr__)
:param who : an object that lives in `setting.owners`
"""
child = getattr(self._cfgimpl_descr, name)
if type(child) != SymLinkOption:
if who == None:
who = self._cfgimpl_context._cfgimpl_settings.owner
if child.is_multi():
if not isinstance(who, owners.DefaultOwner):
if type(value) != Multi:
if type(value) == list:
value = Multi(value, self, child)
else:
raise ConfigError("invalid value for option:"
" {0} that is set to multi".format(name))
else:
value = self.fill_multi(child, child.getdefault(),
use_default_multi=True,
default_multi=child.getdefault_multi())
if not isinstance(who, owners.Owner):
raise TypeError("invalid owner [{0}] for option: {1}".format(
str(who), name))
child.setoption(self, value)
child.setowner(self, who)
else:
homeconfig = self._cfgimpl_get_toplevel()
child.setoption(homeconfig, value)
def set(self, **kwargs):
"""
do what I mean"-interface to option setting. Searches all paths
starting from that config for matches of the optional arguments
and sets the found option if the match is not ambiguous.
:param kwargs: dict of name strings to values.
"""
all_paths = [p.split(".") for p in self.getpaths(allpaths=True)]
for key, value in kwargs.iteritems():
key_p = key.split('.')
candidates = [p for p in all_paths if p[-len(key_p):] == key_p]
if len(candidates) == 1:
name = '.'.join(candidates[0])
homeconfig, name = self._cfgimpl_get_home_by_path(name)
try:
getattr(homeconfig, name)
except MandatoryError:
pass
except Exception, e:
raise e # HiddenOptionError or DisabledOptionError
homeconfig.setoption(name, value,
self._cfgimpl_context._cfgimpl_settings.get_owner())
elif len(candidates) > 1:
raise AmbigousOptionError(
'more than one option that ends with %s' % (key, ))
else:
raise NoMatchingOptionFound(
'there is no option that matches %s'
' or the option is hidden or disabled'% (key, ))
def get(self, name):
"""
same as a `find_first()` method in a config that has identical names:
it returns the first item of an option named `name`
much like the attribute access way, except that
the search for the option is performed recursively in the whole
configuration tree.
**carefull**: very slow !
:returns: option value.
"""
paths = self.getpaths(allpaths=True)
pathsvalues = []
for path in paths:
pathname = path.split('.')[-1]
if pathname == name:
try:
value = getattr(self, path)
return value
except Exception, e:
raise e
raise NotFoundError("option {0} not found in config".format(name))
def _cfgimpl_get_home_by_path(self, path):
""":returns: tuple (config, name)"""
path = path.split('.')
for step in path[:-1]:
self = getattr(self, step)
return self, path[-1]
def _cfgimpl_get_toplevel(self):
":returns: root config"
while self._cfgimpl_parent is not None:
self = self._cfgimpl_parent
return self
def _cfgimpl_get_path(self):
"the path in the attribute access meaning."
subpath = []
obj = self
while obj._cfgimpl_parent is not None:
subpath.insert(0, obj._cfgimpl_descr._name)
obj = obj._cfgimpl_parent
return ".".join(subpath)
# ______________________________________________________________________
# def cfgimpl_previous_value(self, path):
# "stores the previous value"
# home, name = self._cfgimpl_get_home_by_path(path)
# # FIXME fucking name
# return home._cfgimpl_context._cfgimpl_values.previous_values[name]
# def get_previous_value(self, name):
# "for the time being, only the previous Option's value is accessible"
# return self._cfgimpl_context._cfgimpl_values.previous_values[name]
# ______________________________________________________________________
def add_warning(self, warning):
"Config implements its own warning pile. Could be useful"
self._cfgimpl_get_toplevel()._cfgimpl_warnings.append(warning)
def get_warnings(self):
"Config implements its own warning pile"
return self._cfgimpl_get_toplevel()._cfgimpl_warnings
# ____________________________________________________________
def getkey(self):
return self._cfgimpl_descr.getkey(self)
def __hash__(self):
return hash(self.getkey())
def __eq__(self, other):
"Config comparison"
if not isinstance(other, Config):
return False
return self.getkey() == other.getkey()
def __ne__(self, other):
"Config comparison"
return not self == other
# ______________________________________________________________________
def __iter__(self):
"""Pythonesque way of parsing group's ordered options.
iteration only on Options (not OptionDescriptions)"""
for child in self._cfgimpl_descr._children:
if not isinstance(child, OptionDescription):
try:
yield child._name, getattr(self, child._name)
except:
pass # option with properties
def iter_groups(self, group_type=None):
"""iteration on groups objects only.
All groups are returned if `group_type` is `None`, otherwise the groups
can be filtered by categories (families, or whatever).
:param group_type: if defined, is an instance of `groups.GroupType`
or `groups.MasterGroupType` that lives in
`setting.groups`
"""
if group_type is not None:
if not isinstance(group_type, groups.GroupType):
raise TypeError("Unknown group_type: {0}".format(group_type))
for child in self._cfgimpl_descr._children:
if isinstance(child, OptionDescription):
try:
if group_type is not None:
if child.get_group_type() == group_type:
yield child._name, getattr(self, child._name)
else:
yield child._name, getattr(self, child._name)
except:
pass
# ______________________________________________________________________
def __str__(self):
"Config's string representation"
lines = []
for name, grp in self.iter_groups():
lines.append("[%s]" % name)
for name, value in self:
try:
lines.append("%s = %s" % (name, value))
except:
pass
return '\n'.join(lines)
__repr__ = __str__
def getpaths(self, include_groups=False, allpaths=False, mandatory=False):
"""returns a list of all paths in self, recursively, taking care of
the context of properties (hidden/disabled)
:param include_groups: if true, OptionDescription are included
:param allpaths: all the options (event the properties protected ones)
:param mandatory: includes the mandatory options
:returns: list of all paths
"""
paths = []
for path in self._cfgimpl_descr.getpaths(include_groups=include_groups):
try:
value = getattr(self, path)
except MandatoryError:
if mandatory or allpaths:
paths.append(path)
except PropertiesOptionError:
if allpaths:
paths.append(path) # option which have properties added
else:
paths.append(path)
return paths
def _find(self, bytype, byname, byvalue, byattrs, first):
"""
convenience method for finding an option that lives only in the subtree
:param first: return only one option if True, a list otherwise
:return: find list or an exception if nothing has been found
"""
def _filter_by_attrs():
if byattrs is None:
return True
for key, value in byattrs.items():
if not hasattr(option, key):
return False
else:
if getattr(option, key) != value:
return False
else:
continue
return True
def _filter_by_name():
if byname is None:
return True
pathname = path.split('.')[-1]
if pathname == byname:
return True
else:
return False
def _filter_by_value():
if byvalue is None:
return True
try:
value = getattr(self, path)
if value == byvalue:
return True
except: # a property restricts the access of the value
pass
return False
def _filter_by_type():
if bytype is None:
return True
if isinstance(option, bytype):
return True
return False
find_results = []
paths = self.getpaths(allpaths=True)
for path in paths:
try:
option = self.unwrap_from_path(path)
except PropertiesOptionError, err:
continue
if not _filter_by_name():
continue
if not _filter_by_value():
continue
if not _filter_by_type():
continue
if not _filter_by_attrs():
continue
if first:
return option
else:
find_results.append(option)
if find_results == []:
raise NotFoundError("no option found in config with these criteria")
else:
return find_results
def find(self, bytype=None, byname=None, byvalue=None, byattrs=None):
"""
finds a list of options recursively in the config
:param bytype: Option class (BoolOption, StrOption, ...)
:param byname: filter by Option._name
:param byvalue: filter by the option's value
:param byattrs: dict of option attributes (default, callback...)
:returns: list of matching Option objects
"""
return self._find(bytype, byname, byvalue, byattrs, first=False)
def find_first(self, bytype=None, byname=None, byvalue=None, byattrs=None):
"""
finds an option recursively in the config
:param bytype: Option class (BoolOption, StrOption, ...)
:param byname: filter by Option._name
:param byvalue: filter by the option's value
:param byattrs: dict of option attributes (default, callback...)
:returns: list of matching Option objects
"""
return self._find(bytype, byname, byvalue, byattrs, first=True)
def make_dict(config, flatten=False):
"""export the whole config into a `dict`
:returns: dict of Option's name (or path) and values"""
paths = config.getpaths()
pathsvalues = []
for path in paths:
if flatten:
pathname = path.split('.')[-1]
else:
pathname = path
try:
value = getattr(config, path)
pathsvalues.append((pathname, value))
except:
pass # this just a hidden or disabled option
options = dict(pathsvalues)
return options
def mandatory_warnings(config):
"""convenience function to trace Options that are mandatory and
where no value has been set
:returns: generator of mandatory Option's path
FIXME : CAREFULL : not multi-user
"""
mandatory = config._cfgimpl_context._cfgimpl_settings.mandatory
config._cfgimpl_context._cfgimpl_settings.mandatory = True
for path in config._cfgimpl_descr.getpaths(include_groups=True):
try:
value = config._getattr(path, permissive=True)
except MandatoryError:
yield path
except PropertiesOptionError:
pass
config._cfgimpl_context._cfgimpl_settings.mandatory = mandatory