tiramisu/tiramisu/config.py

706 lines
29 KiB
Python

# -*- coding: utf-8 -*-
"options handler global entry point"
# Copyright (C) 2012-2013 Team tiramisu (see AUTHORS for all contributors)
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
#
# The original `Config` design model is unproudly borrowed from
# the rough pypy's guys: http://codespeak.net/svn/pypy/dist/pypy/config/
# the whole pypy projet is under MIT licence
# ____________________________________________________________
import weakref
from tiramisu.error import PropertiesOptionError, ConfigError
from tiramisu.option import OptionDescription, Option, SymLinkOption
from tiramisu.setting import groups, Settings, default_encoding
from tiramisu.storage import get_storages, get_storage, set_storage, \
_impl_getstate_setting
from tiramisu.value import Values, Multi
from tiramisu.i18n import _
class SubConfig(object):
"sub configuration management entry"
__slots__ = ('_impl_context', '_impl_descr', '_impl_path')
def __init__(self, descr, context, subpath=None):
""" Configuration option management master class
:param descr: describes the configuration schema
:type descr: an instance of ``option.OptionDescription``
:param context: the current root config
:type context: `Config`
:type subpath: `str` with the path name
"""
# main option description
if not isinstance(descr, OptionDescription):
raise TypeError(_('descr must be an optiondescription, not {0}'
).format(type(descr)))
self._impl_descr = descr
# sub option descriptions
if not isinstance(context, weakref.ReferenceType):
raise ValueError('context must be a Weakref')
self._impl_context = context
self._impl_path = subpath
def cfgimpl_reset_cache(self, only_expired=False, only=('values',
'settings')):
self._cfgimpl_get_context().cfgimpl_reset_cache(only_expired, only)
def cfgimpl_get_home_by_path(self, path, force_permissive=False,
force_properties=None):
""":returns: tuple (config, name)"""
path = path.split('.')
for step in path[:-1]:
self = self._getattr(step,
force_permissive=force_permissive,
force_properties=force_properties)
return self, path[-1]
def __hash__(self):
return hash(self.cfgimpl_get_description().impl_getkey(self))
def __eq__(self, other):
"Config's comparison"
if not isinstance(other, Config):
return False
return self.cfgimpl_get_description().impl_getkey(self) == \
other.cfgimpl_get_description().impl_getkey(other)
def __ne__(self, other):
"Config's comparison"
if not isinstance(other, Config):
return True
return not self == other
# ______________________________________________________________________
def __iter__(self):
"""Pythonesque way of parsing group's ordered options.
iteration only on Options (not OptionDescriptions)"""
for child in self.cfgimpl_get_description().impl_getchildren():
if not isinstance(child, OptionDescription):
try:
yield child._name, getattr(self, child._name)
except GeneratorExit:
raise StopIteration
except PropertiesOptionError:
pass # option with properties
def iter_all(self):
"""A way of parsing options **and** groups.
iteration on Options and OptionDescriptions."""
for child in self.cfgimpl_get_description().impl_getchildren():
try:
yield child._name, getattr(self, child._name)
except GeneratorExit:
raise StopIteration
except PropertiesOptionError:
pass # option with properties
def iter_groups(self, group_type=None):
"""iteration on groups objects only.
All groups are returned if `group_type` is `None`, otherwise the groups
can be filtered by categories (families, or whatever).
:param group_type: if defined, is an instance of `groups.GroupType`
or `groups.MasterGroupType` that lives in
`setting.groups`
"""
if group_type is not None and not isinstance(group_type,
groups.GroupType):
raise TypeError(_("unknown group_type: {0}").format(group_type))
for child in self.cfgimpl_get_description().impl_getchildren():
if isinstance(child, OptionDescription):
try:
if group_type is None or (group_type is not None and
child.impl_get_group_type()
== group_type):
yield child._name, getattr(self, child._name)
except GeneratorExit:
raise StopIteration
except PropertiesOptionError:
pass
# ______________________________________________________________________
def __str__(self):
"Config's string representation"
lines = []
for name, grp in self.iter_groups():
lines.append("[{0}]".format(name))
for name, value in self:
try:
lines.append("{0} = {1}".format(name, value))
except UnicodeEncodeError:
lines.append("{0} = {1}".format(name,
value.encode(default_encoding)))
except PropertiesOptionError:
pass
return '\n'.join(lines)
__repr__ = __str__
def _cfgimpl_get_context(self):
return self._impl_context()
def cfgimpl_get_description(self):
if self._impl_descr is None:
raise ConfigError(_('no option description found for this config'
' (may be metaconfig without meta)'))
else:
return self._impl_descr
def cfgimpl_get_settings(self):
return self._cfgimpl_get_context()._impl_settings
def cfgimpl_get_values(self):
return self._cfgimpl_get_context()._impl_values
# ____________________________________________________________
# attribute methods
def __setattr__(self, name, value):
"attribute notation mechanism for the setting of the value of an option"
if name.startswith('_impl_'):
object.__setattr__(self, name, value)
return
self._setattr(name, value)
def _setattr(self, name, value, force_permissive=False):
if '.' in name:
homeconfig, name = self.cfgimpl_get_home_by_path(name)
return homeconfig.__setattr__(name, value)
child = getattr(self.cfgimpl_get_description(), name)
if not isinstance(child, SymLinkOption):
if self._impl_path is None:
path = name
else:
path = self._impl_path + '.' + name
self.cfgimpl_get_values().setitem(child, value, path,
force_permissive=force_permissive)
else:
context = self._cfgimpl_get_context()
path = context.cfgimpl_get_description().impl_get_path_by_opt(
child._opt)
context._setattr(path, value, force_permissive=force_permissive)
def __delattr__(self, name):
child = getattr(self.cfgimpl_get_description(), name)
self.cfgimpl_get_values().__delitem__(child)
def __getattr__(self, name):
return self._getattr(name)
def _getattr(self, name, force_permissive=False, force_properties=None,
validate=True):
"""
attribute notation mechanism for accessing the value of an option
:param name: attribute name
:return: option's value if name is an option name, OptionDescription
otherwise
"""
# attribute access by passing a path,
# for instance getattr(self, "creole.general.family.adresse_ip_eth0")
if '.' in name:
homeconfig, name = self.cfgimpl_get_home_by_path(
name, force_permissive=force_permissive,
force_properties=force_properties)
return homeconfig._getattr(name, force_permissive=force_permissive,
force_properties=force_properties,
validate=validate)
opt_or_descr = getattr(self.cfgimpl_get_description(), name)
if self._impl_path is None:
subpath = name
else:
subpath = self._impl_path + '.' + name
# symlink options
if isinstance(opt_or_descr, SymLinkOption):
context = self._cfgimpl_get_context()
path = context.cfgimpl_get_description().impl_get_path_by_opt(
opt_or_descr._opt)
return context._getattr(path, validate=validate,
force_properties=force_properties,
force_permissive=force_permissive)
elif isinstance(opt_or_descr, OptionDescription):
self.cfgimpl_get_settings().validate_properties(
opt_or_descr, True, False, path=subpath,
force_permissive=force_permissive,
force_properties=force_properties)
return SubConfig(opt_or_descr, self._impl_context, subpath)
else:
return self.cfgimpl_get_values().getitem(
opt_or_descr, path=subpath,
validate=validate,
force_properties=force_properties,
force_permissive=force_permissive)
def find(self, bytype=None, byname=None, byvalue=None, type_='option'):
"""
finds a list of options recursively in the config
:param bytype: Option class (BoolOption, StrOption, ...)
:param byname: filter by Option._name
:param byvalue: filter by the option's value
:returns: list of matching Option objects
"""
return self._cfgimpl_get_context()._find(bytype, byname, byvalue,
first=False,
type_=type_,
_subpath=self.cfgimpl_get_path()
)
def find_first(self, bytype=None, byname=None, byvalue=None,
type_='option', display_error=True):
"""
finds an option recursively in the config
:param bytype: Option class (BoolOption, StrOption, ...)
:param byname: filter by Option._name
:param byvalue: filter by the option's value
:returns: list of matching Option objects
"""
return self._cfgimpl_get_context()._find(
bytype, byname, byvalue, first=True, type_=type_,
_subpath=self.cfgimpl_get_path(), display_error=display_error)
def _find(self, bytype, byname, byvalue, first, type_='option',
_subpath=None, check_properties=True, display_error=True):
"""
convenience method for finding an option that lives only in the subtree
:param first: return only one option if True, a list otherwise
:return: find list or an exception if nothing has been found
"""
def _filter_by_name():
try:
if byname is None or path == byname or \
path.endswith('.' + byname):
return True
except IndexError:
pass
return False
def _filter_by_value():
if byvalue is None:
return True
try:
value = getattr(self, path)
if isinstance(value, Multi):
return byvalue in value
else:
return value == byvalue
except PropertiesOptionError: # a property is a restriction
# upon the access of the value
return False
def _filter_by_type():
if bytype is None:
return True
if isinstance(option, bytype):
return True
return False
if type_ not in ('option', 'path', 'value'):
raise ValueError(_('unknown type_ type {0}'
'for _find').format(type_))
find_results = []
opts, paths = self.cfgimpl_get_description()._cache_paths
for index in range(0, len(paths)):
option = opts[index]
if isinstance(option, OptionDescription):
continue
path = paths[index]
if _subpath is not None and not path.startswith(_subpath + '.'):
continue
if not _filter_by_name():
continue
if not _filter_by_value():
continue
if not _filter_by_type():
continue
#remove option with propertyerror, ...
if byvalue is None and check_properties:
try:
value = getattr(self, path)
except PropertiesOptionError:
# a property restricts the access of the value
continue
if type_ == 'value':
retval = value
elif type_ == 'path':
retval = path
elif type_ == 'option':
retval = option
if first:
return retval
else:
find_results.append(retval)
return self._find_return_results(find_results, display_error)
def _find_return_results(self, find_results, display_error):
if find_results == []:
if display_error:
raise AttributeError(_("no option found in config"
" with these criteria"))
else:
# translation is slow
raise AttributeError()
else:
return find_results
def make_dict(self, flatten=False, _currpath=None, withoption=None,
withvalue=None):
"""exports the whole config into a `dict`, for example:
>>> print cfg.make_dict()
{'od2.var4': None, 'od2.var5': None, 'od2.var6': None}
:param flatten: returns a dict(name=value) instead of a dict(path=value)
::
>>> print cfg.make_dict(flatten=True)
{'var5': None, 'var4': None, 'var6': None}
:param withoption: returns the options that are present in the very same
`OptionDescription` than the `withoption` itself::
>>> print cfg.make_dict(withoption='var1')
{'od2.var4': None, 'od2.var5': None,
'od2.var6': None,
'od2.var1': u'value',
'od1.var1': None,
'od1.var3': None,
'od1.var2': None}
:param withvalue: returns the options that have the value `withvalue`
::
>>> print c.make_dict(withoption='var1',
withvalue=u'value')
{'od2.var4': None,
'od2.var5': None,
'od2.var6': None,
'od2.var1': u'value'}
:returns: dict of Option's name (or path) and values
"""
pathsvalues = []
if _currpath is None:
_currpath = []
if withoption is None and withvalue is not None:
raise ValueError(_("make_dict can't filtering with value without "
"option"))
if withoption is not None:
mypath = self.cfgimpl_get_path()
for path in self._cfgimpl_get_context()._find(bytype=Option,
byname=withoption,
byvalue=withvalue,
first=False,
type_='path',
_subpath=mypath):
path = '.'.join(path.split('.')[:-1])
opt = self._cfgimpl_get_context().cfgimpl_get_description(
).impl_get_opt_by_path(path)
if mypath is not None:
if mypath == path:
withoption = None
withvalue = None
break
else:
tmypath = mypath + '.'
if not path.startswith(tmypath):
raise AttributeError(_('unexpected path {0}, '
'should start with {1}'
'').format(path, mypath))
path = path[len(tmypath):]
self._make_sub_dict(opt, path, pathsvalues, _currpath, flatten)
#withoption can be set to None below !
if withoption is None:
for opt in self.cfgimpl_get_description().impl_getchildren():
path = opt._name
self._make_sub_dict(opt, path, pathsvalues, _currpath, flatten)
if _currpath == []:
options = dict(pathsvalues)
return options
return pathsvalues
def _make_sub_dict(self, opt, path, pathsvalues, _currpath, flatten):
if isinstance(opt, OptionDescription):
try:
pathsvalues += getattr(self, path).make_dict(flatten,
_currpath +
path.split('.'))
except PropertiesOptionError:
pass # this just a hidden or disabled option
else:
try:
value = self._getattr(opt._name)
if flatten:
name = opt._name
else:
name = '.'.join(_currpath + [opt._name])
pathsvalues.append((name, value))
except PropertiesOptionError:
pass # this just a hidden or disabled option
def cfgimpl_get_path(self):
descr = self.cfgimpl_get_description()
context_descr = self._cfgimpl_get_context().cfgimpl_get_description()
return context_descr.impl_get_path_by_opt(descr)
class CommonConfig(SubConfig):
"abstract base class for the Config and the MetaConfig"
__slots__ = ('_impl_values', '_impl_settings', '_impl_meta')
def _impl_build_all_paths(self):
self.cfgimpl_get_description().impl_build_cache()
def read_only(self):
"read only is a global config's setting, see `settings.py`"
self.cfgimpl_get_settings().read_only()
def read_write(self):
"read write is a global config's setting, see `settings.py`"
self.cfgimpl_get_settings().read_write()
def getowner(self, opt):
"""convenience method to retrieve an option's owner
from the config itself
"""
if not isinstance(opt, Option) and not isinstance(opt, SymLinkOption):
raise TypeError(_('opt in getowner must be an option not {0}'
'').format(type(opt)))
return self.cfgimpl_get_values().getowner(opt)
def unwrap_from_path(self, path, force_permissive=False):
"""convenience method to extract and Option() object from the Config()
and it is **fast**: finds the option directly in the appropriate
namespace
:returns: Option()
"""
if '.' in path:
homeconfig, path = self.cfgimpl_get_home_by_path(
path, force_permissive=force_permissive)
return getattr(homeconfig.cfgimpl_get_description(), path)
return getattr(self.cfgimpl_get_description(), path)
def cfgimpl_get_path(self):
return None
def cfgimpl_get_meta(self):
return self._impl_meta
# information
def impl_set_information(self, key, value):
"""updates the information's attribute
:param key: information's key (ex: "help", "doc"
:param value: information's value (ex: "the help string")
"""
self._impl_values.set_information(key, value)
def impl_get_information(self, key, default=None):
"""retrieves one information's item
:param key: the item string (ex: "help")
"""
return self._impl_values.get_information(key, default)
# ____________________________________________________________
class Config(CommonConfig):
"main configuration management entry"
__slots__ = ('__weakref__', '_impl_test')
def __init__(self, descr, session_id=None, persistent=False):
""" Configuration option management master class
:param descr: describes the configuration schema
:type descr: an instance of ``option.OptionDescription``
:param context: the current root config
:type context: `Config`
:param session_id: session ID is import with persistent Config to
retrieve good session
:type session_id: `str`
:param persistent: if persistent, don't delete storage when leaving
:type persistent: `boolean`
"""
settings, values = get_storages(self, session_id, persistent)
self._impl_settings = Settings(self, settings)
self._impl_values = Values(self, values)
super(Config, self).__init__(descr, weakref.ref(self))
self._impl_build_all_paths()
self._impl_meta = None
#undocumented option used only in test script
self._impl_test = False
def __getstate__(self):
if self._impl_meta is not None:
raise ConfigError('cannot serialize Config with meta')
slots = set()
for subclass in self.__class__.__mro__:
if subclass is not object:
slots.update(subclass.__slots__)
slots -= frozenset(['_impl_context', '__weakref__'])
state = {}
for slot in slots:
try:
state[slot] = getattr(self, slot)
except AttributeError:
pass
storage = self._impl_values._p_._storage
if not storage.serializable:
raise ConfigError('this storage is not serialisable, could be a '
'none persistent storage')
state['_storage'] = {'session_id': storage.session_id,
'persistent': storage.persistent}
state['_impl_setting'] = _impl_getstate_setting()
return state
def __setstate__(self, state):
for key, value in state.items():
if key not in ['_storage', '_impl_setting']:
setattr(self, key, value)
set_storage(**state['_impl_setting'])
self._impl_context = weakref.ref(self)
self._impl_settings.context = weakref.ref(self)
self._impl_values.context = weakref.ref(self)
storage = get_storage(test=self._impl_test, **state['_storage'])
self._impl_values._impl_setstate(storage)
self._impl_settings._impl_setstate(storage)
def cfgimpl_reset_cache(self,
only_expired=False,
only=('values', 'settings')):
if 'values' in only:
self.cfgimpl_get_values().reset_cache(only_expired=only_expired)
if 'settings' in only:
self.cfgimpl_get_settings().reset_cache(only_expired=only_expired)
#class MetaConfig(CommonConfig):
# __slots__ = ('_impl_children',)
# def __init__(self, children, meta=True, session_id=None, persistent=False):
# if not isinstance(children, list):
# raise ValueError(_("metaconfig's children must be a list"))
# self._impl_descr = None
# self._impl_path = None
# if meta:
# for child in children:
# if not isinstance(child, CommonConfig):
# raise TypeError(_("metaconfig's children "
# "must be config, not {0}"
# ).format(type(child)))
# if self._impl_descr is None:
# self._impl_descr = child.cfgimpl_get_description()
# elif not self._impl_descr is child.cfgimpl_get_description():
# raise ValueError(_('all config in metaconfig must '
# 'have the same optiondescription'))
# if child.cfgimpl_get_meta() is not None:
# raise ValueError(_("child has already a metaconfig's"))
# child._impl_meta = self
# self._impl_children = children
# settings, values = get_storages(self, session_id, persistent)
# self._impl_settings = Settings(self, settings)
# self._impl_values = Values(self, values)
# self._impl_meta = None
# def cfgimpl_get_children(self):
# return self._impl_children
# def cfgimpl_get_context(self):
# "a meta config is a config wich has a setting, that is itself"
# return self
# def cfgimpl_reset_cache(self,
# only_expired=False,
# only=('values', 'settings')):
# if 'values' in only:
# self.cfgimpl_get_values().reset_cache(only_expired=only_expired)
# if 'settings' in only:
# self.cfgimpl_get_settings().reset_cache(only_expired=only_expired)
# for child in self._impl_children:
# child.cfgimpl_reset_cache(only_expired=only_expired, only=only)
# def set_contexts(self, path, value):
# for child in self._impl_children:
# try:
# if not isinstance(child, MetaConfig):
# setattr(child, path, value)
# else:
# child.set_contexts(path, value)
# except PropertiesOptionError:
# pass
# def find_first_contexts(self, byname=None, bypath=None, byvalue=None,
# type_='path', display_error=True):
# ret = []
# try:
# if bypath is None and byname is not None and \
# self.cfgimpl_get_description() is not None:
# bypath = self._find(bytype=None, byvalue=None, byname=byname,
# first=True, type_='path',
# check_properties=False,
# display_error=display_error)
# except ConfigError:
# pass
# for child in self._impl_children:
# try:
# if not isinstance(child, MetaConfig):
# if bypath is not None:
# if byvalue is not None:
# if getattr(child, bypath) == byvalue:
# ret.append(child)
# else:
# #not raise
# getattr(child, bypath)
# ret.append(child)
# else:
# ret.append(child.find_first(byname=byname,
# byvalue=byvalue,
# type_=type_,
# display_error=False))
# else:
# ret.extend(child.find_first_contexts(byname=byname,
# bypath=bypath,
# byvalue=byvalue,
# type_=type_,
# display_error=False))
# except AttributeError:
# pass
# return self._find_return_results(ret, display_error)
def mandatory_warnings(config):
"""convenience function to trace Options that are mandatory and
where no value has been set
:returns: generator of mandatory Option's path
"""
#if value in cache, properties are not calculated
config.cfgimpl_reset_cache(only=('values',))
for path in config.cfgimpl_get_description().impl_getpaths(
include_groups=True):
try:
config._getattr(path, force_properties=frozenset(('mandatory',)))
except PropertiesOptionError as err:
if err.proptype == ['mandatory']:
yield path
config.cfgimpl_reset_cache(only=('values',))