tiramisu/tiramisu/config.py

1051 lines
49 KiB
Python

# -*- coding: utf-8 -*-
# Copyright (C) 2012-2017 Team tiramisu (see AUTHORS for all contributors)
#
# This program is free software: you can redistribute it and/or modify it
# under the terms of the GNU Lesser General Public License as published by the
# Free Software Foundation, either version 3 of the License, or (at your
# option) any later version.
#
# This program is distributed in the hope that it will be useful, but WITHOUT
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
# FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
# details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
# The original `Config` design model is unproudly borrowed from
# the rough pypy's guys: http://codespeak.net/svn/pypy/dist/pypy/config/
# the whole pypy projet is under MIT licence
# ____________________________________________________________
"options handler global entry point"
import weakref
import sys
from time import time
from itertools import chain
from .error import PropertiesOptionError, ConfigError, ConflictError
from .option import OptionDescription, Option, SymLinkOption, \
DynSymLinkOption, SynDynOptionDescription
from .option.baseoption import valid_name
from .setting import groups, Settings, default_encoding, undefined
from .storage import get_storages, set_storage, get_default_values_storages
from .value import Values, Multi
from .i18n import _
if sys.version_info[0] >= 3: # pragma: no cover
xrange = range
class SubConfig(object):
"""Sub configuration management entry.
Tree if OptionDescription's responsability. SubConfig are generated
on-demand. A Config is also a SubConfig.
Root Config is call context below
"""
__slots__ = ('_impl_context', '_impl_descr', '_impl_path')
def __init__(self, descr, context, subpath=None):
""" Configuration option management master class
:param descr: describes the configuration schema
:type descr: an instance of ``option.OptionDescription``
:param context: the current root config
:type context: `Config`
:type subpath: `str` with the path name
"""
# main option description
error = False
if descr is not None and not isinstance(descr, OptionDescription) and \
not isinstance(descr, SynDynOptionDescription): # pragma: optional cover
error = True
if error:
raise TypeError(_('descr must be an optiondescription, not {0}'
).format(type(descr)))
self._impl_descr = descr
# sub option descriptions
if not isinstance(context, weakref.ReferenceType): # pragma: optional cover
raise ValueError('context must be a Weakref')
self._impl_context = context
self._impl_path = subpath
def cfgimpl_reset_cache(self,
only_expired=False,
only=('values', 'properties', 'permissives', 'settings'),
opt=None,
path=None,
orig_opts=None):
"""reset all settings in cache
:param only_expired: if True reset only expired cached values
:type only_expired: boolean
"""
def reset_one_option_cache(opt, path):
if opt.__class__.__name__ == 'DynOptionDescription':
# this option is a DynOptionDescription
descr = context.cfgimpl_get_description()
spath = path.split('.')
subpath = '.'.join(spath[:-1])
dynopt = getattr(descr, subpath)._getattr(spath[-1], context=context,
dyn=False)
for suffix in dynopt._impl_get_suffixes(context):
path = subpath + '.' + spath[-1] + suffix
if 'values' in only:
values._p_.delcache(path)
if 'settings' in only or 'properties' in only:
settings._p_.delcache(path)
if 'settings' in only or 'permissives' in only:
settings._pp_.delcache(path)
elif not isinstance(opt, DynSymLinkOption) and opt._is_subdyn():
# this option is an instance of DynOptionDescription
descr = context.cfgimpl_get_description()
spath = path.split('.')
try:
subpath = '.'.join(spath[:-2])
dynsubopt = getattr(descr, subpath)
spath1 = spath[-2]
spath2 = spath[-1]
spath3 = None
except AttributeError:
if len(spath) == 2:
subpath = '.'.join(spath)
spath1 = spath[-2]
spath2 = spath[-1]
spath3 = None
dynsubopt = descr
else:
subpath = '.'.join(spath[:-3])
spath1 = spath[-3]
spath2 = spath[-2]
spath3 = spath[-1]
dynsubopt = getattr(descr, subpath)
dynopt = dynsubopt._getattr(spath1, context=context, dyn=False)
for suffix in dynopt._impl_get_suffixes(context):
path = subpath + '.' + spath1 + suffix + '.' + spath2 + suffix
if spath3:
path += '.' + spath3 + suffix
if 'values' in only:
values._p_.delcache(path)
if 'settings' in only or 'properties' in only:
settings._p_.delcache(path)
if 'settings' in only or 'permissives' in only:
settings._pp_.delcache(path)
else:
if 'values' in only:
values._p_.delcache(path)
if 'settings' in only or 'permissives' in only:
settings._p_.delcache(path)
if 'settings' in only or 'permissives' in only:
settings._pp_.delcache(path)
def reset_expired_cache():
# reset cache for expired cache value ony
datetime = int(time())
if 'values' in only:
values._p_.reset_expired_cache(datetime)
if 'settings' in only or 'properties' in only:
settings._p_.reset_expired_cache(datetime)
if 'settings' in only or 'permissives' in only:
settings._pp_.reset_expired_cache(datetime)
def reset_all_cache():
if 'values' in only:
values._p_.reset_all_cache()
if 'settings' in only:
settings._p_.reset_all_cache()
settings._pp_.reset_all_cache()
if orig_opts is None:
orig_opts = set()
context = self._cfgimpl_get_context()
if 'values' in only:
values = context.cfgimpl_get_values()
if 'settings' in only or 'properties' in only or 'permissives' in only:
settings = context.cfgimpl_get_settings()
if not None in (opt, path):
reset_one_option_cache(opt, path)
# remove cache for option which has dependencies with this option
if not isinstance(opt, OptionDescription) and not isinstance(opt, SynDynOptionDescription) and \
opt.impl_is_master_slaves('slave'):
slaves = opt.impl_get_master_slaves().getslaves(opt)
else:
slaves = []
for option in chain(opt._get_dependencies(self), slaves):
if option in orig_opts:
continue
if 'values' in only:
option.reset_cache(opt, values, 'values', orig_opts)
if 'settings' in only:
option.reset_cache(opt, settings, 'settings', orig_opts)
else:
if 'properties' in only:
option.reset_cache(opt, settings, 'properties', orig_opts)
if 'permissives' in only:
option.reset_cache(opt, settings, 'permissives', orig_opts)
elif only_expired:
reset_expired_cache()
else:
reset_all_cache()
def cfgimpl_get_home_by_path(self, path, force_permissive=False,
returns_raise=False, _setting_properties=undefined):
""":returns: tuple (config, name)"""
path = path.split('.')
for step in path[:-1]:
self = self.getattr(step,
force_permissive=force_permissive,
returns_raise=returns_raise,
_setting_properties=_setting_properties)
if isinstance(self, Exception):
return self, None
return self, path[-1]
# ______________________________________________________________________
def __iter__(self, force_permissive=False):
"""Pythonesque way of parsing group's ordered options.
iteration only on Options (not OptionDescriptions)"""
for child in self.cfgimpl_get_description()._impl_getchildren(
context=self._cfgimpl_get_context()):
if not child.impl_is_optiondescription():
try:
name = child.impl_getname()
yield name, self.getattr(name,
force_permissive=force_permissive)
except GeneratorExit: # pragma: optional cover
if sys.version_info[0] < 3:
raise StopIteration
else:
raise GeneratorExit()
except PropertiesOptionError: # pragma: optional cover
pass # option with properties
def iter_all(self, force_permissive=False):
"""A way of parsing options **and** groups.
iteration on Options and OptionDescriptions."""
for child in self.cfgimpl_get_description().impl_getchildren():
try:
yield child.impl_getname(), self.getattr(child.impl_getname(),
force_permissive=force_permissive)
except GeneratorExit: # pragma: optional cover
if sys.version_info[0] < 3:
raise StopIteration
else:
raise GeneratorExit()
except PropertiesOptionError: # pragma: optional cover
pass # option with properties
def iter_groups(self, group_type=None, force_permissive=False):
"""iteration on groups objects only.
All groups are returned if `group_type` is `None`, otherwise the groups
can be filtered by categories (families, or whatever).
:param group_type: if defined, is an instance of `groups.GroupType`
or `groups.MasterGroupType` that lives in
`setting.groups`
"""
if group_type is not None and not isinstance(group_type,
groups.GroupType): # pragma: optional cover
raise TypeError(_("unknown group_type: {0}").format(group_type))
for child in self.cfgimpl_get_description()._impl_getchildren(
context=self._cfgimpl_get_context()):
if child.impl_is_optiondescription():
try:
if group_type is None or (group_type is not None and
child.impl_get_group_type()
== group_type):
name = child.impl_getname()
yield name, self.getattr(name, force_permissive=force_permissive)
except GeneratorExit: # pragma: optional cover
if sys.version_info[0] < 3:
raise StopIteration
else:
raise GeneratorExit()
except PropertiesOptionError: # pragma: optional cover
pass
# ______________________________________________________________________
def __str__(self):
"Config's string representation"
lines = []
for name, grp in self.iter_groups():
lines.append("[{0}]".format(name))
for name, value in self:
try:
lines.append("{0} = {1}".format(name, value))
except UnicodeEncodeError: # pragma: optional cover
lines.append("{0} = {1}".format(name,
value.encode(default_encoding)))
return '\n'.join(lines)
__repr__ = __str__
def _cfgimpl_get_context(self):
"""context could be None, we need to test it
context is None only if all reference to `Config` object is deleted
(for example we delete a `Config` and we manipulate a reference to
old `SubConfig`, `Values`, `Multi` or `Settings`)
"""
context = self._impl_context()
if context is None: # pragma: optional cover
raise ConfigError(_('the context does not exist anymore'))
return context
def cfgimpl_get_context(self):
return self._cfgimpl_get_context()
def cfgimpl_get_description(self):
if self._impl_descr is None: # pragma: optional cover
raise ConfigError(_('no option description found for this config'
' (may be GroupConfig)'))
else:
return self._impl_descr
def cfgimpl_get_settings(self):
return self._cfgimpl_get_context()._impl_settings
def cfgimpl_get_values(self):
return self._cfgimpl_get_context()._impl_values
# ____________________________________________________________
# attribute methods
def __setattr__(self, name, value):
"attribute notation mechanism for the setting of the value of an option"
self.setattr(name, value)
def _setattr(self, name, value, force_permissive=False, not_raises=False):
"""use setattr instead of _setattr
"""
self.setattr(name, value, force_permissive=force_permissive,
not_raises=not_raises)
def setattr(self, name, value, force_permissive=False, not_raises=False, index=None,
_setting_properties=undefined, _commit=True):
if name.startswith('_impl_'):
return object.__setattr__(self, name, value)
context = self._cfgimpl_get_context()
if _setting_properties is undefined:
_setting_properties = context.cfgimpl_get_settings()._getproperties(read_write=True)
if '.' in name: # pragma: optional cover
homeconfig, name = self.cfgimpl_get_home_by_path(name,
force_permissive=force_permissive,
_setting_properties=_setting_properties)
return homeconfig.setattr(name, value, force_permissive,
not_raises, index=index,
_setting_properties=_setting_properties,
_commit=_commit)
child = self.cfgimpl_get_description().__getattr__(name,
context=context)
if isinstance(child, OptionDescription) or isinstance(child, SynDynOptionDescription):
raise TypeError(_("can't assign to an OptionDescription")) # pragma: optional cover
elif child._is_symlinkoption() and \
not isinstance(child, DynSymLinkOption): # pragma: no dynoptiondescription cover
path = context.cfgimpl_get_description().impl_get_path_by_opt(
child._impl_getopt())
context.setattr(path, value, force_permissive, not_raises, index=index,
_setting_properties=_setting_properties,
_commit=_commit)
else:
subpath = self._get_subpath(name)
ret = self.cfgimpl_get_values().setitem(child, value, subpath,
force_permissive=force_permissive,
not_raises=not_raises, index=index,
_setting_properties=_setting_properties,
_commit=_commit)
if ret is not None:
return ret
def __delattr__(self, name):
context = self._cfgimpl_get_context()
child = self.cfgimpl_get_description().__getattr__(name, context)
self.cfgimpl_get_values().__delitem__(child)
def __getattr__(self, name):
return self.getattr(name)
def _getattr(self, name, force_permissive=False, validate=True): # pragma: optional cover
"""use getattr instead of _getattr
"""
return self.getattr(name, force_permissive, validate)
def _get_subpath(self, name):
if self._impl_path is None:
subpath = name
else:
subpath = self._impl_path + '.' + name
return subpath
def getattr(self, name, force_permissive=False, validate=True,
_setting_properties=undefined, _self_properties=undefined, index=None,
returns_raise=False):
"""
attribute notation mechanism for accessing the value of an option
:param name: attribute name
:return: option's value if name is an option name, OptionDescription
otherwise
"""
# attribute access by passing a path,
# for instance getattr(self, "creole.general.family.adresse_ip_eth0")
context = self._cfgimpl_get_context()
if _setting_properties is undefined:
_setting_properties = context.cfgimpl_get_settings()._getproperties(read_write=True)
if '.' in name:
homeconfig, name = self.cfgimpl_get_home_by_path(
name, force_permissive=force_permissive,
returns_raise=returns_raise, _setting_properties=_setting_properties)
if isinstance(homeconfig, Exception):
cfg = homeconfig
else:
cfg = homeconfig.getattr(name, force_permissive=force_permissive,
validate=validate,
_setting_properties=_setting_properties,
_self_properties=_self_properties,
index=index, returns_raise=returns_raise)
else:
option = self.cfgimpl_get_description().__getattr__(name,
context=context)
subpath = self._get_subpath(name)
if isinstance(option, DynSymLinkOption):
cfg = self.cfgimpl_get_values()._get_cached_value(
option, path=subpath,
validate=validate,
force_permissive=force_permissive,
setting_properties=_setting_properties,
self_properties=_self_properties,
index=index)
elif option._is_symlinkoption(): # pragma: no dynoptiondescription cover
path = context.cfgimpl_get_description().impl_get_path_by_opt(
option._impl_getopt())
cfg = context.getattr(path, validate=validate,
force_permissive=force_permissive,
_setting_properties=_setting_properties,
_self_properties=_self_properties,
index=index, returns_raise=True)
elif option.impl_is_optiondescription():
props = self.cfgimpl_get_settings().validate_properties(
option, True, False, path=subpath,
force_permissive=force_permissive,
self_properties=_self_properties,
setting_properties=_setting_properties)
if props:
if returns_raise:
return props
else:
raise props
return SubConfig(option, self._impl_context, subpath)
else:
cfg = self.cfgimpl_get_values()._get_cached_value(
option, path=subpath,
validate=validate,
force_permissive=force_permissive,
setting_properties=_setting_properties,
self_properties=_self_properties,
index=index)
if not returns_raise and isinstance(cfg, Exception):
raise cfg
return cfg
def find(self, bytype=None, byname=None, byvalue=undefined, type_='option',
check_properties=True, force_permissive=False):
"""
finds a list of options recursively in the config
:param bytype: Option class (BoolOption, StrOption, ...)
:param byname: filter by Option.impl_getname()
:param byvalue: filter by the option's value
:returns: list of matching Option objects
"""
return self._cfgimpl_get_context()._find(bytype, byname, byvalue,
first=False,
type_=type_,
_subpath=self.cfgimpl_get_path(False),
check_properties=check_properties,
force_permissive=force_permissive)
def find_first(self, bytype=None, byname=None, byvalue=undefined,
type_='option', raise_if_not_found=True, check_properties=True,
force_permissive=False):
"""
finds an option recursively in the config
:param bytype: Option class (BoolOption, StrOption, ...)
:param byname: filter by Option.impl_getname()
:param byvalue: filter by the option's value
:returns: list of matching Option objects
"""
return self._cfgimpl_get_context()._find(
bytype, byname, byvalue, first=True, type_=type_,
_subpath=self.cfgimpl_get_path(False), raise_if_not_found=raise_if_not_found,
check_properties=check_properties,
force_permissive=force_permissive)
def _find(self, bytype, byname, byvalue, first, type_='option',
_subpath=None, check_properties=True, raise_if_not_found=True,
force_permissive=False, only_path=undefined,
only_option=undefined, setting_properties=undefined):
"""
convenience method for finding an option that lives only in the subtree
:param first: return only one option if True, a list otherwise
:return: find list or an exception if nothing has been found
"""
def _filter_by_value():
if byvalue is undefined:
return True
value = self.getattr(path, force_permissive=force_permissive,
_setting_properties=setting_properties,
returns_raise=True)
if isinstance(value, Exception):
if isinstance(value, PropertiesOptionError):
return False
raise value # pragma: no cover
elif isinstance(value, Multi):
return byvalue in value
else:
return value == byvalue
if type_ not in ('option', 'path', 'value'): # pragma: optional cover
raise ValueError(_('unknown type_ type {0}'
'for _find').format(type_))
find_results = []
# if value and/or check_properties are set, need all avalaible option
# If first one has no good value or not good property check second one
# and so on
only_first = first is True and byvalue is undefined and \
check_properties is None
if only_path is not undefined:
options = [(only_path, only_option)]
else:
options = self.cfgimpl_get_description().impl_get_options_paths(
bytype, byname, _subpath, only_first,
self._cfgimpl_get_context())
for path, option in options:
if not _filter_by_value():
continue
#remove option with propertyerror, ...
if byvalue is undefined and check_properties:
value = self.getattr(path,
force_permissive=force_permissive,
_setting_properties=setting_properties,
returns_raise=True)
if isinstance(value, Exception):
if isinstance(value, PropertiesOptionError):
continue
else:
raise value # pragma: no cover
if type_ == 'value':
retval = value
elif type_ == 'path':
retval = path
elif type_ == 'option':
retval = option
if first:
return retval
else:
find_results.append(retval)
return self._find_return_results(find_results, raise_if_not_found)
def _find_return_results(self, find_results, raise_if_not_found):
if find_results == []: # pragma: optional cover
if raise_if_not_found:
raise AttributeError(_("no option found in config"
" with these criteria"))
else:
return find_results
def make_dict(self, flatten=False, _currpath=None, withoption=None,
withvalue=undefined, force_permissive=False,
setting_properties=undefined, fullpath=False):
"""exports the whole config into a `dict`, for example:
>>> print cfg.make_dict()
{'od2.var4': None, 'od2.var5': None, 'od2.var6': None}
:param flatten: returns a dict(name=value) instead of a dict(path=value)
::
>>> print cfg.make_dict(flatten=True)
{'var5': None, 'var4': None, 'var6': None}
:param withoption: returns the options that are present in the very same
`OptionDescription` than the `withoption` itself::
>>> print cfg.make_dict(withoption='var1')
{'od2.var4': None, 'od2.var5': None,
'od2.var6': None,
'od2.var1': u'value',
'od1.var1': None,
'od1.var3': None,
'od1.var2': None}
:param withvalue: returns the options that have the value `withvalue`
::
>>> print c.make_dict(withoption='var1',
withvalue=u'value')
{'od2.var4': None,
'od2.var5': None,
'od2.var6': None,
'od2.var1': u'value'}
:returns: dict of Option's name (or path) and values
"""
pathsvalues = []
if _currpath is None:
_currpath = []
if withoption is None and withvalue is not undefined: # pragma: optional cover
raise ValueError(_("make_dict can't filtering with value without "
"option"))
if setting_properties is undefined:
setting_properties = self.cfgimpl_get_settings()._getproperties(
read_write=False)
if withoption is not None:
context = self._cfgimpl_get_context()
for path in context._find(bytype=None, byname=withoption,
byvalue=withvalue, first=False,
type_='path', _subpath=self.cfgimpl_get_path(False),
force_permissive=force_permissive,
setting_properties=setting_properties):
path = '.'.join(path.split('.')[:-1])
opt = context.unwrap_from_path(path, force_permissive=True)
mypath = self.cfgimpl_get_path()
if mypath is not None:
if mypath == path:
withoption = None
withvalue = undefined
break
else:
tmypath = mypath + '.'
if not path.startswith(tmypath): # pragma: optional cover
raise AttributeError(_('unexpected path {0}, '
'should start with {1}'
'').format(path, mypath))
path = path[len(tmypath):]
self._make_sub_dict(opt, path, pathsvalues, _currpath, flatten,
force_permissive=force_permissive,
setting_properties=setting_properties,
fullpath=fullpath)
#withoption can be set to None below !
if withoption is None:
for opt in self.cfgimpl_get_description().impl_getchildren():
path = opt.impl_getname()
self._make_sub_dict(opt, path, pathsvalues, _currpath, flatten,
force_permissive=force_permissive,
setting_properties=setting_properties,
fullpath=fullpath)
if _currpath == []:
options = dict(pathsvalues)
return options
return pathsvalues
def _make_sub_dict(self, opt, path, pathsvalues, _currpath, flatten,
setting_properties, force_permissive=False, fullpath=False):
value = self.getattr(path,
force_permissive=force_permissive,
_setting_properties=setting_properties,
returns_raise=True)
if isinstance(value, Exception):
if not isinstance(value, PropertiesOptionError): # pragma: no cover
raise value
else:
if opt.impl_is_optiondescription():
pathsvalues += value.make_dict(flatten,
_currpath + path.split('.'),
force_permissive=force_permissive,
setting_properties=setting_properties,
fullpath=fullpath)
else:
if flatten:
name = opt.impl_getname()
else:
if fullpath:
root_path = self.cfgimpl_get_path()
if root_path is None:
name = opt.impl_getname()
else:
name = '.'.join([root_path, opt.impl_getname()])
else:
name = '.'.join(_currpath + [opt.impl_getname()])
pathsvalues.append((name, value))
def cfgimpl_get_path(self, dyn=True):
descr = self.cfgimpl_get_description()
if not dyn and descr.impl_is_dynoptiondescription():
context_descr = self._cfgimpl_get_context().cfgimpl_get_description()
return context_descr.impl_get_path_by_opt(descr._impl_getopt())
return self._impl_path
class _CommonConfig(SubConfig):
"abstract base class for the Config, GroupConfig and the MetaConfig"
__slots__ = ('_impl_values', '_impl_settings', '_impl_meta', '_impl_test')
def _impl_build_all_caches(self, force_store_values):
descr = self.cfgimpl_get_description()
if not descr.impl_already_build_caches():
descr._build_cache_option()
descr._build_cache(self)
descr.impl_build_force_store_values(self, force_store_values)
def read_only(self):
"read only is a global config's setting, see `settings.py`"
self.cfgimpl_get_settings().read_only()
def read_write(self):
"read write is a global config's setting, see `settings.py`"
self.cfgimpl_get_settings().read_write()
def getowner(self, opt, index=None, force_permissive=False):
"""convenience method to retrieve an option's owner
from the config itself
"""
if not isinstance(opt, Option) and \
not isinstance(opt, SymLinkOption) and \
not isinstance(opt, DynSymLinkOption): # pragma: optional cover
raise TypeError(_('opt in getowner must be an option not {0}'
'').format(type(opt)))
return self.cfgimpl_get_values().getowner(opt, index=index,
force_permissive=force_permissive)
def unwrap_from_path(self, path, force_permissive=False):
"""convenience method to extract and Option() object from the Config()
and it is **fast**: finds the option directly in the appropriate
namespace
:returns: Option()
"""
context = self._cfgimpl_get_context()
if '.' in path:
homeconfig, path = self.cfgimpl_get_home_by_path(
path, force_permissive=force_permissive)
return homeconfig.cfgimpl_get_description().__getattr__(path, context=context)
return self.cfgimpl_get_description().__getattr__(path, context=context)
def cfgimpl_get_path(self, dyn=True):
return None
def cfgimpl_get_meta(self):
if self._impl_meta is not None:
return self._impl_meta()
# information
def impl_set_information(self, key, value):
"""updates the information's attribute
:param key: information's key (ex: "help", "doc"
:param value: information's value (ex: "the help string")
"""
self._impl_values.set_information(key, value)
def impl_get_information(self, key, default=undefined):
"""retrieves one information's item
:param key: the item string (ex: "help")
"""
return self._impl_values.get_information(key, default)
def impl_del_information(self, key, raises=True):
self._impl_values.del_information(key, raises)
def __getstate__(self):
raise NotImplementedError()
def _gen_fake_values(self, session):
fake_config = Config(self._impl_descr, persistent=False,
force_values=get_default_values_storages(),
force_settings=self.cfgimpl_get_settings())
fake_config.cfgimpl_get_values()._p_.importation(self.cfgimpl_get_values()._p_.exportation(session, fake=True))
return fake_config
def duplicate(self, force_values=None, force_settings=None):
config = Config(self._impl_descr, _duplicate=True, force_values=force_values,
force_settings=force_settings)
session = self.cfgimpl_get_values()._p_.getsession()
config.cfgimpl_get_values()._p_.importation(self.cfgimpl_get_values()._p_.exportation(
session))
config.cfgimpl_get_settings()._p_.set_modified_properties(self.cfgimpl_get_settings(
)._p_.get_modified_properties())
config.cfgimpl_get_settings()._pp_.set_modified_permissives(self.cfgimpl_get_settings(
)._pp_.get_modified_permissives())
return config
# ____________________________________________________________
class Config(_CommonConfig):
"main configuration management entry"
__slots__ = ('__weakref__', '_impl_test', '_impl_name')
def __init__(self, descr, session_id=None, persistent=False,
force_values=None, force_settings=None,
_duplicate=False, _force_store_values=True):
""" Configuration option management master class
:param descr: describes the configuration schema
:type descr: an instance of ``option.OptionDescription``
:param context: the current root config
:type context: `Config`
:param session_id: session ID is import with persistent Config to
retrieve good session
:type session_id: `str`
:param persistent: if persistent, don't delete storage when leaving
:type persistent: `boolean`
"""
if force_settings is not None and force_values is not None:
if isinstance(force_settings, tuple):
self._impl_settings = Settings(self, force_settings[0], force_settings[1])
else:
self._impl_settings = force_settings
self._impl_values = Values(self, force_values)
else:
properties, permissives, values, session_id = get_storages(self, session_id, persistent)
if not valid_name(session_id): # pragma: optional cover
raise ValueError(_("invalid session ID: {0} for config").format(session_id))
self._impl_settings = Settings(self, properties, permissives)
self._impl_values = Values(self, values)
super(Config, self).__init__(descr, weakref.ref(self))
self._impl_meta = None
#undocumented option used only in test script
self._impl_test = False
if _duplicate is False and (force_settings is None or force_values is None):
self._impl_build_all_caches(_force_store_values)
self._impl_name = session_id
def impl_getname(self):
return self._impl_name
def impl_getsessionid(self):
return self._impl_values._p_._storage.session_id
class GroupConfig(_CommonConfig):
__slots__ = ('__weakref__', '_impl_children', '_impl_name')
def __init__(self, children, session_id=None, persistent=False,
_descr=None):
if not isinstance(children, list):
raise ValueError(_("groupconfig's children must be a list"))
names = []
for child in children:
if not isinstance(child, _CommonConfig):
raise ValueError(_("groupconfig's children must be Config, MetaConfig or GroupConfig"))
name_ = child._impl_name
names.append(name_)
if len(names) != len(set(names)):
for idx in xrange(1, len(names) + 1):
name = names.pop(0)
if name in names:
raise ConflictError(_('config name must be uniq in '
'groupconfig for {0}').format(name))
self._impl_children = children
properties, permissives, values, session_id = get_storages(self, session_id, persistent)
self._impl_settings = Settings(self, properties, permissives)
self._impl_values = Values(self, values)
super(GroupConfig, self).__init__(_descr, weakref.ref(self))
self._impl_meta = None
#undocumented option used only in test script
self._impl_test = False
self._impl_name = session_id
def cfgimpl_get_children(self):
return self._impl_children
def cfgimpl_reset_cache(self,
only_expired=False,
only=('values', 'settings'),
opt=None,
path=None,
orig_opts=set()):
if isinstance(self, MetaConfig):
super(GroupConfig, self).cfgimpl_reset_cache(only_expired=only_expired, only=only,
opt=opt, path=path, orig_opts=orig_opts)
for child in self._impl_children:
child.cfgimpl_reset_cache(only_expired=only_expired, only=only, opt=opt, path=path,
orig_opts=orig_opts)
def set_value(self, path, value, _commit=True):
"""Setattr not in current GroupConfig, but in each children
"""
ret = []
for child in self._impl_children:
if isinstance(child, MetaConfig):
ret.extend(child.set_value(path, value, only_config=True, _commit=False))
elif isinstance(child, GroupConfig):
ret.extend(child.set_value(path, value, _commit=False))
else:
childret = child.setattr(path, value, not_raises=True, _commit=False)
if childret is not None:
ret.append(childret)
if _commit:
self.cfgimpl_get_values()._p_.commit()
return ret
def find_firsts(self, byname=None, bypath=undefined, byoption=undefined,
byvalue=undefined, raise_if_not_found=True, _sub=False,
check_properties=True):
"""Find first not in current GroupConfig, but in each children
"""
ret = []
#if MetaConfig, all children have same OptionDescription in context
#so search only one time the option for all children
if bypath is undefined and byname is not None and \
isinstance(self, MetaConfig):
bypath = self._find(bytype=None, byvalue=undefined, byname=byname,
first=True, type_='path',
check_properties=None,
raise_if_not_found=raise_if_not_found)
byname = None
byoption = self.cfgimpl_get_description(
).impl_get_opt_by_path(bypath)
for child in self._impl_children:
if isinstance(child, GroupConfig):
ret.extend(child.find_firsts(byname=byname, bypath=bypath,
byoption=byoption,
byvalue=byvalue,
check_properties=check_properties,
raise_if_not_found=False,
_sub=True))
elif child._find(None, byname, byvalue, first=True,
type_='path', raise_if_not_found=False,
check_properties=check_properties,
only_path=bypath, only_option=byoption):
ret.append(child)
if _sub:
return ret
else:
return GroupConfig(self._find_return_results(ret, raise_if_not_found))
def __str__(self):
ret = ''
for child in self._impl_children:
ret += '({0})\n'.format(child._impl_name)
if self._impl_descr is not None:
ret += super(GroupConfig, self).__str__()
return ret
__repr__ = __str__
def getattr(self, name, force_permissive=False, validate=True,
_setting_properties=undefined, _self_properties=undefined, index=None,
returns_raise=False):
for child in self._impl_children:
if name == child._impl_name:
return child
return super(GroupConfig, self).getattr(name, force_permissive,
validate,
_self_properties=_self_properties,
index=index,
_setting_properties=_setting_properties,
returns_raise=returns_raise)
class MetaConfig(GroupConfig):
__slots__ = tuple()
def __init__(self, children, session_id=None, persistent=False,
optiondescription=None, _force_store_values=True):
descr = None
if optiondescription is not None:
new_children = []
for child_session_id in children:
#FIXME _force_store_values doit etre a true si inexistant !
new_children.append(Config(optiondescription, persistent=persistent,
session_id=child_session_id, _force_store_values=_force_store_values))
children = new_children
for child in children:
if not isinstance(child, _CommonConfig):
raise TypeError(_("metaconfig's children "
"should be config, not {0}"
).format(type(child)))
if child.cfgimpl_get_meta() is not None:
raise ValueError(_("child has already a metaconfig's"))
if descr is None:
descr = child.cfgimpl_get_description()
elif not descr is child.cfgimpl_get_description():
raise ValueError(_('all config in metaconfig must '
'have the same optiondescription'))
child._impl_meta = weakref.ref(self)
super(MetaConfig, self).__init__(children, session_id, persistent,
descr)
def set_value(self, path, value, force_default=False,
force_dont_change_value=False, force_default_if_same=False,
only_config=False, _commit=True):
"""only_config: could be set if you want modify value in all Config included in
this MetaConfig
"""
if only_config:
if force_default or force_default_if_same or force_dont_change_value:
raise ValueError(_('force_default, force_default_if_same or '
'force_dont_change_value cannot be set with'
' only_config'))
return super(MetaConfig, self).set_value(path, value, _commit=_commit)
ret = []
if force_default or force_default_if_same or force_dont_change_value:
if force_default and force_dont_change_value:
raise ValueError(_('force_default and force_dont_change_value'
' cannot be set together'))
opt = self.cfgimpl_get_description().impl_get_opt_by_path(path)
setting_properties = self.cfgimpl_get_settings()._getproperties(read_write=False)
for child in self._impl_children:
if force_default_if_same or force_default:
if force_default_if_same:
if not child.cfgimpl_get_values()._contains(path):
child_value = undefined
else:
child_value = child.getattr(path)
if force_default or value == child_value:
child.cfgimpl_get_values().reset(opt, path=path,
validate=False,
_setting_properties=setting_properties,
_commit=False)
continue
if force_dont_change_value:
child_value = child.getattr(path, _setting_properties=setting_properties,
returns_raise=True)
if isinstance(child_value, Exception):
ret.append(child_value)
elif value != child_value:
childret = child.setattr(path, child_value, _commit=False, not_raises=True)
if childret is not None: # pragma: no cover
ret.append(childret)
setret = self.setattr(path, value, _commit=_commit, not_raises=True)
if setret is not None:
ret.append(setret)
return ret
def reset(self, path):
opt = self.cfgimpl_get_description().impl_get_opt_by_path(path)
setting_properties = self.cfgimpl_get_settings()._getproperties(read_write=False)
for child in self._impl_children:
child.cfgimpl_get_values().reset(opt, path=path,
validate=False,
_setting_properties=setting_properties,
_commit=False)
self.cfgimpl_get_values().reset(opt, path=path,
validate=False,
_setting_properties=setting_properties)
def new_config(self, session_id, persistent=False):
config = Config(self._impl_descr, session_id=session_id,
persistent=persistent)
if config._impl_name in [child._impl_name for child in self._impl_children]: # pragma: no cover
raise ConflictError(_('config name must be uniq in '
'groupconfig for {0}').format(config._impl_name))
config._impl_meta = weakref.ref(self)
self._impl_children.append(config)
return config