tiramisu/tiramisu/config.py

541 lines
24 KiB
Python
Raw Blame History

This file contains invisible Unicode characters

This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# -*- coding: utf-8 -*-
"pretty small and local configuration management tool"
# Copyright (C) 2012 Team tiramisu (see README for all contributors)
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
#
# The original `Config` design model is unproudly borrowed from
# the rough pypy's guys: http://codespeak.net/svn/pypy/dist/pypy/config/
# the whole pypy projet is under MIT licence
# ____________________________________________________________
from copy import copy
from tiramisu.error import (HiddenOptionError, ConfigError, NotFoundError,
AmbigousOptionError, ConflictConfigError, NoMatchingOptionFound,
SpecialOwnersError, MandatoryError, MethodCallError,
DisabledOptionError, ModeOptionError)
from tiramisu.option import (OptionDescription, Option, SymLinkOption, group_types,
Multi, apply_requires, modes)
from tiramisu.autolib import special_owners, special_owner_factory
# ______________________________________________________________________
# generic owner. 'default' is the general config owner after init time
default_owner = 'user'
# ____________________________________________________________
class Config(object):
_cfgimpl_hidden = True
_cfgimpl_disabled = True
_cfgimpl_mandatory = True
_cfgimpl_frozen = False
_cfgimpl_owner = default_owner
_cfgimpl_toplevel = None
_cfgimpl_mode = 'normal'
def __init__(self, descr, parent=None, **overrides):
self._cfgimpl_descr = descr
self._cfgimpl_value_owners = {}
self._cfgimpl_parent = parent
# `Config()` indeed takes care of the `Option()`'s values
self._cfgimpl_values = {}
self._cfgimpl_previous_values = {}
# XXX warnings are a great idea, let's make up a better use of it
self._cfgimpl_warnings = []
self._cfgimpl_toplevel = self._cfgimpl_get_toplevel()
# `freeze()` allows us to carry out this calculation again if necessary
self._cfgimpl_frozen = self._cfgimpl_toplevel._cfgimpl_frozen
self._cfgimpl_build(overrides)
def _validate_duplicates(self, children):
duplicates = []
for dup in children:
if dup._name not in duplicates:
duplicates.append(dup._name)
else:
raise ConflictConfigError('duplicate option name: '
'{0}'.format(dup._name))
def _cfgimpl_build(self, overrides):
self._validate_duplicates(self._cfgimpl_descr._children)
for child in self._cfgimpl_descr._children:
if isinstance(child, Option):
if child.is_multi():
childdef = Multi(copy(child.getdefault()), config=self,
child=child)
self._cfgimpl_values[child._name] = childdef
self._cfgimpl_previous_values[child._name] = list(childdef)
else:
childdef = child.getdefault()
self._cfgimpl_values[child._name] = childdef
self._cfgimpl_previous_values[child._name] = childdef
if child.getcallback() is not None:
if child._is_hidden():
self._cfgimpl_value_owners[child._name] = 'auto'
else:
self._cfgimpl_value_owners[child._name] = 'fill'
else:
if child.is_multi():
self._cfgimpl_value_owners[child._name] = ['default' \
for i in range(len(child.getdefault() ))]
else:
self._cfgimpl_value_owners[child._name] = 'default'
elif isinstance(child, OptionDescription):
self._validate_duplicates(child._children)
self._cfgimpl_values[child._name] = Config(child, parent=self)
self.override(overrides)
def cfgimpl_update(self):
"dynamically adds `Option()` or `OptionDescription()`"
# Nothing is static. Everything evolve.
# FIXME this is an update for new options in the schema only
# see the update_child() method of the descr object
for child in self._cfgimpl_descr._children:
if isinstance(child, Option):
if child._name not in self._cfgimpl_values:
if child.is_multi():
self._cfgimpl_values[child._name] = Multi(
copy(child.getdefault()), config=self, child=child)
self._cfgimpl_value_owners[child._name] = ['default' \
for i in range(len(child.getdefault() ))]
else:
self._cfgimpl_values[child._name] = copy(child.getdefault())
self._cfgimpl_value_owners[child._name] = 'default'
elif isinstance(child, OptionDescription):
if child._name not in self._cfgimpl_values:
self._cfgimpl_values[child._name] = Config(child, parent=self)
def override(self, overrides):
for name, value in overrides.iteritems():
homeconfig, name = self._cfgimpl_get_home_by_path(name)
# if there are special_owners, impossible to override
if homeconfig._cfgimpl_value_owners[name] in special_owners:
raise SpecialOwnersError("cannot override option: {0} because "
"of its special owner".format(name))
homeconfig.setoption(name, value, 'default')
def cfgimpl_set_owner(self, owner):
self._cfgimpl_owner = owner
for child in self._cfgimpl_descr._children:
if isinstance(child, OptionDescription):
self._cfgimpl_values[child._name].cfgimpl_set_owner(owner)
# ____________________________________________________________
def cfgimpl_hide(self):
if self._cfgimpl_parent != None:
raise MethodCallError("this method root_hide() shall not be"
"used with non-root Config() object")
rootconfig = self._cfgimpl_get_toplevel()
rootconfig._cfgimpl_hidden = True
def cfgimpl_show(self):
if self._cfgimpl_parent != None:
raise MethodCallError("this method root_hide() shall not be"
"used with non-root Config() object")
rootconfig = self._cfgimpl_get_toplevel()
rootconfig._cfgimpl_hidden = False
# ____________________________________________________________
def cfgimpl_disable(self):
if self._cfgimpl_parent != None:
raise MethodCallError("this method root_hide() shall not be"
"used with non-root Confit() object")
rootconfig = self._cfgimpl_get_toplevel()
rootconfig._cfgimpl_disabled = True
def cfgimpl_enable(self):
if self._cfgimpl_parent != None:
raise MethodCallError("this method root_hide() shall not be"
"used with non-root Confit() object")
rootconfig = self._cfgimpl_get_toplevel()
rootconfig._cfgimpl_disabled = False
# ____________________________________________________________
def __setattr__(self, name, value):
if '.' in name:
homeconfig, name = self._cfgimpl_get_home_by_path(name)
return setattr(homeconfig, name, value)
if name.startswith('_cfgimpl_'):
self.__dict__[name] = value
return
if self._cfgimpl_frozen and getattr(self, name) != value:
raise TypeError("trying to change a value in a frozen config"
": {0} {1}".format(name, value))
if type(getattr(self._cfgimpl_descr, name)) != SymLinkOption:
self._validate(name, getattr(self._cfgimpl_descr, name))
self.setoption(name, value, self._cfgimpl_owner)
def _validate(self, name, opt_or_descr):
apply_requires(opt_or_descr, self)
if not type(opt_or_descr) == OptionDescription:
# hidden options
if self._cfgimpl_toplevel._cfgimpl_hidden and \
(opt_or_descr._is_hidden() or self._cfgimpl_descr._is_hidden()):
raise HiddenOptionError("trying to access to a hidden option:"
" {0}".format(name))
# disabled options
if self._cfgimpl_toplevel._cfgimpl_disabled and \
(opt_or_descr._is_disabled() or self._cfgimpl_descr._is_disabled()):
raise DisabledOptionError("this option is disabled:"
" {0}".format(name))
# expert options
# XXX currently doesn't look at the group, is it really necessary ?
if self._cfgimpl_toplevel._cfgimpl_mode != 'normal':
if opt_or_descr.get_mode() != 'normal':
raise ModeOptionError("this option's mode is not normal:"
" {0}".format(name))
def __getattr__(self, name):
# attribute access by passing a path,
# for instance getattr(self, "creole.general.family.adresse_ip_eth0")
if '.' in name:
homeconfig, name = self._cfgimpl_get_home_by_path(name)
return getattr(homeconfig, name)
opt_or_descr = getattr(self._cfgimpl_descr, name)
# symlink options
if type(opt_or_descr) == SymLinkOption:
return getattr(self, opt_or_descr.path)
self._validate(name, opt_or_descr)
# special attributes
if name.startswith('_cfgimpl_'):
# if it were in __dict__ it would have been found already
return self.__dict__[name]
raise AttributeError("%s object has no attribute %s" %
(self.__class__, name))
if name not in self._cfgimpl_values:
raise AttributeError("%s object has no attribute %s" %
(self.__class__, name))
if name in self._cfgimpl_value_owners:
owner = self._cfgimpl_value_owners[name]
if owner in special_owners:
value = self._cfgimpl_values[name]
if value != None:
if opt_or_descr.is_multi():
if owner == 'fill' and None not in value:
return value
else:
if owner == 'fill' and value != None:
return value
result = special_owner_factory(name, owner,
value=value,
callback=opt_or_descr.getcallback(),
callback_params=opt_or_descr.getcallback_params(),
config=self._cfgimpl_get_toplevel())
# this result **shall not** be a list
# for example, [1, 2, 3, None] -> [1, 2, 3, result]
if isinstance(result, list):
raise ConfigError('invalid calculated value returned'
' for option {0} : shall not be a list'.format(name))
if result != None and not opt_or_descr._validate(result):
raise ConfigError('invalid calculated value returned'
' for option {0}'.format(name))
if opt_or_descr.is_multi():
if value == []:
_result = Multi([result], value.config, value.child)
else:
_result = Multi([], value.config, value.child)
for val in value:
if val == None:
val = result
_result.append(val)
else:
_result = result
return _result
# mandatory options
if not isinstance(opt_or_descr, OptionDescription):
homeconfig = self._cfgimpl_get_toplevel()
mandatory = homeconfig._cfgimpl_mandatory
if opt_or_descr.is_mandatory() and mandatory:
if self._cfgimpl_values[name] == None\
and opt_or_descr.getdefault() == None:
raise MandatoryError("option: {0} is mandatory "
"and shall have a value".format(name))
return self._cfgimpl_values[name]
def __dir__(self):
#from_type = dir(type(self))
from_dict = list(self.__dict__)
extras = list(self._cfgimpl_values)
return sorted(set(extras + from_dict))
def unwrap_from_name(self, name):
# didn't have to stoop so low: `self.get()` must be the proper method
# **and it is slow**: it recursively searches into the namespaces
paths = self.getpaths(allpaths=True)
opts = dict([(path, self.unwrap_from_path(path)) for path in paths])
all_paths = [p.split(".") for p in self.getpaths()]
for pth in all_paths:
if name in pth:
return opts[".".join(pth)]
raise NotFoundError("name: {0} not found".format(name))
def unwrap_from_path(self, path):
# didn't have to stoop so low, `geattr(self, path)` is much better
# **fast**: finds the option directly in the appropriate namespace
if '.' in path:
homeconfig, path = self._cfgimpl_get_home_by_path(path)
return getattr(homeconfig._cfgimpl_descr, path)
return getattr(self._cfgimpl_descr, path)
def __delattr__(self, name):
# if you use delattr you are responsible for all bad things happening
if name.startswith('_cfgimpl_'):
del self.__dict__[name]
return
self._cfgimpl_value_owners[name] = 'default'
opt = getattr(self._cfgimpl_descr, name)
if isinstance(opt, OptionDescription):
raise AttributeError("can't option subgroup")
self._cfgimpl_values[name] = getattr(opt, 'default', None)
def setoption(self, name, value, who=None):
#who is **not necessarily** a owner, because it cannot be a list
#FIXME : sortir le setoption pour les multi, ca ne devrait pas être la
child = getattr(self._cfgimpl_descr, name)
if who == None:
if child.is_multi():
newowner = [self._cfgimpl_owner for i in range(len(value))]
else:
newowner = self._cfgimpl_owner
else:
if type(child) != SymLinkOption:
if child.is_multi():
if type(value) != Multi:
if type(value) == list:
value = Multi(value, self, child)
else:
raise ConfigError("invalid value for option:"
" {0} that is set to multi".format(name))
newowner = [who for i in range(len(value))]
else:
newowner = who
if type(child) != SymLinkOption:
if name not in self._cfgimpl_values:
raise AttributeError('unknown option %s' % (name,))
# special owners, a value with a owner *auto* cannot be changed
oldowner = self._cfgimpl_value_owners[child._name]
if oldowner == 'auto':
if who == 'auto':
raise ConflictConfigError('cannot override value to %s for '
'option %s' % (value, name))
if oldowner == who:
oldvalue = getattr(self, name)
if oldvalue == value: #or who in ("default",):
return
child.setoption(self, value, who)
# if the value owner is 'auto', set the option to hidden
if who == 'auto':
if not child._is_hidden():
child.hide()
if (value is None and who != 'default' and not child.is_multi()):
child.setowner(self, 'default')
self._cfgimpl_values[name] = copy(child.getdefault())
elif (value == [] and who != 'default' and child.is_multi()):
child.setowner(self, ['default' for i in range(len(child.getdefault()))])
self._cfgimpl_values[name] = Multi(copy(child.getdefault()),
config=self, child=child)
else:
child.setowner(self, newowner)
else:
homeconfig = self._cfgimpl_get_toplevel()
child.setoption(homeconfig, value, who)
def set(self, **kwargs):
all_paths = [p.split(".") for p in self.getpaths(allpaths=True)]
for key, value in kwargs.iteritems():
key_p = key.split('.')
candidates = [p for p in all_paths if p[-len(key_p):] == key_p]
if len(candidates) == 1:
name = '.'.join(candidates[0])
homeconfig, name = self._cfgimpl_get_home_by_path(name)
try:
getattr(homeconfig, name)
except MandatoryError:
pass
except Exception, e:
raise e # HiddenOptionError or DisabledOptionError
homeconfig.setoption(name, value, self._cfgimpl_owner)
elif len(candidates) > 1:
raise AmbigousOptionError(
'more than one option that ends with %s' % (key, ))
else:
raise NoMatchingOptionFound(
'there is no option that matches %s'
' or the option is hidden or disabled'% (key, ))
def get(self, name):
paths = self.getpaths(allpaths=True)
pathsvalues = []
for path in paths:
pathname = path.split('.')[-1]
if pathname == name:
try:
value = getattr(self, path)
return value
except Exception, e:
raise e
raise NotFoundError("option {0} not found in config".format(name))
def _cfgimpl_get_home_by_path(self, path):
"""returns tuple (config, name)"""
path = path.split('.')
for step in path[:-1]:
self = getattr(self, step)
return self, path[-1]
def _cfgimpl_get_toplevel(self):
while self._cfgimpl_parent is not None:
self = self._cfgimpl_parent
return self
def cfgimpl_previous_value(self, path):
home, name = self._cfgimpl_get_home_by_path(path)
return home._cfgimpl_previous_values[name]
def get_previous_value(self, name):
return self._cfgimpl_previous_values[name]
def add_warning(self, warning):
self._cfgimpl_get_toplevel()._cfgimpl_warnings.append(warning)
def get_warnings(self):
return self._cfgimpl_get_toplevel()._cfgimpl_warnings
# ____________________________________________________________
# freeze and read-write statuses
def cfgimpl_freeze(self):
rootconfig = self._cfgimpl_get_toplevel()
rootconfig._cfgimpl_frozen = True
self._cfgimpl_frozen = True
def cfgimpl_unfreeze(self):
rootconfig = self._cfgimpl_get_toplevel()
rootconfig._cfgimpl_frozen = False
self._cfgimpl_frozen = False
def is_frozen(self):
# it should be the same value as self._cfgimpl_frozen...
rootconfig = self._cfgimpl_get_toplevel()
return rootconfig.__dict__['_cfgimpl_frozen']
def cfgimpl_read_only(self):
# hung up on freeze, hidden and disabled concepts
self.cfgimpl_freeze()
rootconfig = self._cfgimpl_get_toplevel()
rootconfig._cfgimpl_hidden = False
rootconfig._cfgimpl_disabled = True
rootconfig._cfgimpl_mandatory = True
def cfgimpl_set_mode(self, mode):
# normal or expert mode
rootconfig = self._cfgimpl_get_toplevel()
if mode not in modes:
raise ConfigError("mode {0} not available".format(mode))
rootconfig._cfgimpl_mode = mode
def cfgimpl_read_write(self):
# hung up on freeze, hidden and disabled concepts
self.cfgimpl_unfreeze()
rootconfig = self._cfgimpl_get_toplevel()
rootconfig._cfgimpl_hidden = True
rootconfig._cfgimpl_disabled = True
rootconfig._cfgimpl_mandatory = False
# ____________________________________________________________
def getkey(self):
return self._cfgimpl_descr.getkey(self)
def __hash__(self):
return hash(self.getkey())
def __eq__(self, other):
return self.getkey() == other.getkey()
def __ne__(self, other):
return not self == other
def __iter__(self):
# iteration only on Options (not OptionDescriptions)
for child in self._cfgimpl_descr._children:
if isinstance(child, Option):
try:
yield child._name, getattr(self, child._name)
except:
pass # hidden, disabled option group
def iter_groups(self, group_type=None):
"iteration on OptionDescriptions"
if group_type == None:
groups = group_types
else:
if group_type not in group_types:
raise TypeError("Unknown group_type: {0}".format(group_type))
groups = [group_type]
for child in self._cfgimpl_descr._children:
if isinstance(child, OptionDescription):
try:
if child.get_group_type() in groups:
yield child._name, getattr(self, child._name)
except:
pass # hidden, disabled option
def __str__(self, indent=""):
lines = []
children = [(child._name, child)
for child in self._cfgimpl_descr._children]
children.sort()
for name, child in children:
if self._cfgimpl_value_owners.get(name, None) == 'default':
continue
value = getattr(self, name)
if isinstance(value, Config):
substr = value.__str__(indent + " ")
else:
substr = "%s %s = %s" % (indent, name, value)
if substr:
lines.append(substr)
if indent and not lines:
return '' # hide subgroups with all default values
lines.insert(0, "%s[%s]" % (indent, self._cfgimpl_descr._name,))
return '\n'.join(lines)
def getpaths(self, include_groups=False, allpaths=False):
"""returns a list of all paths in self, recursively, taking care of
the context (hidden/disabled)
"""
paths = []
for path in self._cfgimpl_descr.getpaths(include_groups=include_groups):
try:
value = getattr(self, path)
except Exception, e:
if not allpaths:
pass # hidden or disabled option
else:
paths.append(path) # hidden or disabled option added
else:
paths.append(path)
return paths
def make_dict(config, flatten=False):
paths = config.getpaths()
pathsvalues = []
for path in paths:
if flatten:
pathname = path.split('.')[-1]
else:
pathname = path
try:
value = getattr(config, path)
pathsvalues.append((pathname, value))
except:
pass # this just a hidden or disabled option
options = dict(pathsvalues)
return options
# ____________________________________________________________