tiramisu/tiramisu/config.py

1245 lines
53 KiB
Python

# -*- coding: utf-8 -*-
# Copyright (C) 2012-2018 Team tiramisu (see AUTHORS for all contributors)
#
# This program is free software: you can redistribute it and/or modify it
# under the terms of the GNU Lesser General Public License as published by the
# Free Software Foundation, either version 3 of the License, or (at your
# option) any later version.
#
# This program is distributed in the hope that it will be useful, but WITHOUT
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
# FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
# details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
# The original `Config` design model is unproudly borrowed from
# the rough pypy's guys: http://codespeak.net/svn/pypy/dist/pypy/config/
# the whole pypy projet is under MIT licence
# ____________________________________________________________
"options handler global entry point"
import weakref
from time import time
from copy import copy
from .error import PropertiesOptionError, ConfigError, ConflictError, SlaveError
from .option.syndynoptiondescription import SynDynOptionDescription
from .option.masterslave import MasterSlaves
from .option.baseoption import BaseOption, valid_name
from .setting import ConfigBag, groups, Settings, undefined
from .storage import get_storages, get_default_values_storages
from .value import Values # , Multi
from .i18n import _
class SubConfig(object):
"""Sub configuration management entry.
Tree if OptionDescription's responsability. SubConfig are generated
on-demand. A Config is also a SubConfig.
Root Config is call context below
"""
__slots__ = ('_impl_context',
'_impl_descr',
'_impl_path',
'_impl_length')
def __init__(self,
descr,
context,
config_bag,
subpath=None):
""" Configuration option management master class
:param descr: describes the configuration schema
:type descr: an instance of ``option.OptionDescription``
:param context: the current root config
:type context: `Config`
:type subpath: `str` with the path name
"""
# main option description
error = False
if descr is not None and (not isinstance(descr, (BaseOption, SynDynOptionDescription)) or not descr.impl_is_optiondescription()):
error = True
if error:
raise TypeError(_('descr must be an optiondescription, not {0}'
).format(type(descr)))
self._impl_descr = descr
# sub option descriptions
if not isinstance(context, weakref.ReferenceType): # pragma: optional cover
raise ValueError('context must be a Weakref')
self._impl_context = context
self._impl_path = subpath
if descr is not None and \
descr.impl_get_group_type() == groups.master:
master = descr.getmaster()
masterpath = master.impl_getname()
mconfig_bag = config_bag.copy('nooption')
mconfig_bag.option = master
value = self.getattr(masterpath,
None,
mconfig_bag)
self._impl_length = len(value)
def cfgimpl_get_length(self):
return getattr(self, '_impl_length')
def reset_one_option_cache(self,
values,
settings,
resetted_opts,
opt,
path):
tresetted_opts = copy(resetted_opts)
opt.reset_cache(opt,
path,
values,
'values',
tresetted_opts)
tresetted_opts = copy(resetted_opts)
opt.reset_cache(opt,
path,
settings,
'settings',
tresetted_opts)
resetted_opts.extend(tresetted_opts)
for woption in opt._get_dependencies(self):
option = woption()
if option in resetted_opts:
continue
option_path = option.impl_getpath(self)
self.reset_one_option_cache(values,
settings,
resetted_opts,
option,
option_path)
del option
def cfgimpl_reset_cache(self,
only_expired=False,
opt=None,
path=None,
resetted_opts=None):
"""reset all settings in cache
:param only_expired: if True reset only expired cached values
:type only_expired: boolean
"""
if resetted_opts is None:
resetted_opts = []
context = self._cfgimpl_get_context()
values = context.cfgimpl_get_values()
settings = context.cfgimpl_get_settings()
if not None in (opt, path):
if opt not in resetted_opts:
self.reset_one_option_cache(values,
settings,
resetted_opts,
opt,
path)
elif only_expired:
# reset cache for expired cache value ony
datetime = int(time())
values._p_.reset_expired_cache(datetime)
settings._p_.reset_expired_cache(datetime)
else:
values._p_.reset_all_cache()
settings._p_.reset_all_cache()
def cfgimpl_get_home_by_path(self,
path,
config_bag):
""":returns: tuple (config, name)"""
path = path.split('.')
for step in path[:-1]:
sconfig_bag = config_bag.copy('nooption')
self = self.getattr(step,
None,
sconfig_bag)
if not isinstance(self, SubConfig):
raise AttributeError(_('unknown option {}').format(path[-1]))
return self, path[-1]
# ______________________________________________________________________
# def __iter__(self, force_permissive=False):
# """Pythonesque way of parsing group's ordered options.
# iteration only on Options (not OptionDescriptions)"""
# setting_properties = self.cfgimpl_get_context().cfgimpl_get_settings().get_context_properties()
# for child in self.cfgimpl_get_description().impl_getchildren(context=self._cfgimpl_get_context()):
# if not child.impl_is_optiondescription():
# try:
# name = child.impl_getname()
# yield name, self.getattr(name,
# force_permissive=force_permissive,
# setting_properties=setting_properties)
# except GeneratorExit: # pragma: optional cover
# if sys.version_info[0] < 3:
# raise StopIteration
# else:
# raise GeneratorExit()
# except PropertiesOptionError: # pragma: optional cover
# pass # option with properties
#
# def iter_all(self, force_permissive=False):
# """A way of parsing options **and** groups.
# iteration on Options and OptionDescriptions."""
# for child in self.cfgimpl_get_description().impl_getchildren():
# try:
# yield child.impl_getname(), self.getattr(child.impl_getname(),
# force_permissive=force_permissive)
# except GeneratorExit: # pragma: optional cover
# if sys.version_info[0] < 3:
# raise StopIteration
# else:
# raise GeneratorExit()
# except PropertiesOptionError: # pragma: optional cover
# pass # option with properties
def iter_groups(self,
config_bag,
group_type=None):
"""iteration on groups objects only.
All groups are returned if `group_type` is `None`, otherwise the groups
can be filtered by categories (families, or whatever).
:param group_type: if defined, is an instance of `groups.GroupType`
or `groups.MasterGroupType` that lives in
`setting.groups`
"""
if group_type is not None and not isinstance(group_type,
groups.GroupType): # pragma: optional cover
raise TypeError(_("unknown group_type: {0}").format(group_type))
context = self._cfgimpl_get_context()
for child in self.cfgimpl_get_description().impl_getchildren(config_bag):
if child.impl_is_optiondescription():
nconfig_bag = config_bag.copy('nooption')
nconfig_bag.option = child
try:
if group_type is None or (group_type is not None and
child.impl_get_group_type()
== group_type):
name = child.impl_getname()
yield name, self.getattr(name,
None,
nconfig_bag)
except PropertiesOptionError: # pragma: optional cover
pass
def cfgimpl_get_children(self, config_bag):
context = self._cfgimpl_get_context()
for opt in self.cfgimpl_get_description().impl_getchildren(config_bag):
nconfig_bag = config_bag.copy('nooption')
nconfig_bag.option = opt
name = opt.impl_getname()
subpath = self._get_subpath(name)
if nconfig_bag.setting_properties is not None:
try:
context.cfgimpl_get_settings().validate_properties(subpath,
None,
nconfig_bag)
except PropertiesOptionError:
continue
yield name
# ______________________________________________________________________
# def __str__(self):
# "Config's string representation"
# lines = []
# for name, grp in self.iter_groups():
# lines.append("[{0}]".format(name))
# for name, value in self:
# try:
# lines.append("{0} = {1}".format(name, value))
# except UnicodeEncodeError: # pragma: optional cover
# lines.append("{0} = {1}".format(name,
# value.encode(default_encoding)))
# return '\n'.join(lines)
#
# __repr__ = __str__
def _cfgimpl_get_context(self):
"""context could be None, we need to test it
context is None only if all reference to `Config` object is deleted
(for example we delete a `Config` and we manipulate a reference to
old `SubConfig`, `Values`, `Multi` or `Settings`)
"""
context = self._impl_context()
if context is None: # pragma: optional cover
raise ConfigError(_('the context does not exist anymore'))
return context
def cfgimpl_get_context(self):
return self._cfgimpl_get_context()
def cfgimpl_get_description(self):
if self._impl_descr is None: # pragma: optional cover
raise ConfigError(_('no option description found for this config'
' (may be GroupConfig)'))
else:
return self._impl_descr
def cfgimpl_get_settings(self):
return self._cfgimpl_get_context()._impl_settings
def cfgimpl_get_values(self):
return self._cfgimpl_get_context()._impl_values
def setattr(self,
name,
index,
value,
config_bag,
_commit=True):
if name.startswith('_impl_'):
return object.__setattr__(self,
name,
value)
context = self._cfgimpl_get_context()
if '.' in name: # pragma: optional cover
# when set_value
self, name = self.cfgimpl_get_home_by_path(name,
config_bag)
if config_bag.option is None:
config_bag.option = self.cfgimpl_get_description().impl_getchild(name,
config_bag,
self)
if config_bag.option.impl_is_optiondescription() or \
isinstance(config_bag.option, SynDynOptionDescription):
raise ConfigError(_("can't assign to an OptionDescription")) # pragma: optional cover
elif config_bag.option.impl_is_symlinkoption():
raise ConfigError(_("can't assign to a SymLinkOption"))
else:
path = self._get_subpath(name)
if config_bag.setting_properties:
context.cfgimpl_get_settings().validate_properties(path,
index,
config_bag)
self.cfgimpl_get_description().impl_validate_value(config_bag.option,
value,
self)
return context.cfgimpl_get_values().setvalue(path,
index,
value,
config_bag,
_commit)
def delattr(self,
name,
index,
config_bag):
context = self._cfgimpl_get_context()
if '.' in name: # pragma: optional cover
self, name = self.cfgimpl_get_home_by_path(name,
config_bag)
if config_bag.option is None:
config_bag.option = self.cfgimpl_get_description().impl_getchild(name,
config_bag,
self)
option = config_bag.option
if option.impl_is_optiondescription() or isinstance(option, SynDynOptionDescription):
raise TypeError(_("can't delete an OptionDescription")) # pragma: optional cover
elif option.impl_is_symlinkoption():
raise TypeError(_("can't delete a SymLinkOption"))
subpath = self._get_subpath(name)
values = self.cfgimpl_get_values()
if index is not None:
if option.impl_is_master_slaves('master'):
values.reset_master(self,
subpath,
index,
config_bag)
elif option.impl_is_master_slaves('slave'):
values.reset_slave(subpath,
index,
config_bag)
else:
raise ValueError(_("can delete value with index only with a master or a slave"))
else:
values.reset(subpath,
config_bag)
def _get_subpath(self, name):
if self._impl_path is None:
subpath = name
else:
subpath = self._impl_path + '.' + name
return subpath
def getattr(self,
name,
index,
config_bag,
returns_option=False,
iter_slave=False):
"""
attribute notation mechanism for accessing the value of an option
:param name: attribute name
:return: option's value if name is an option name, OptionDescription
otherwise
"""
if '.' in name:
# raise Exception('je suis desole ...')
self, name = self.cfgimpl_get_home_by_path(name,
config_bag)
context = self._cfgimpl_get_context()
option = config_bag.option
if option is None:
option = self.cfgimpl_get_description().impl_getchild(name,
config_bag,
self)
config_bag.option = option
if option.impl_is_symlinkoption():
if returns_option is True:
return option
opt = option.impl_getopt()
path = context.cfgimpl_get_description().impl_get_path_by_opt(opt)
sconfig_bag = config_bag.copy('nooption')
sconfig_bag.ori_option = option
sconfig_bag.option = opt
return context.getattr(path,
index,
sconfig_bag)
subpath = self._get_subpath(name)
if config_bag.setting_properties:
self.cfgimpl_get_settings().validate_properties(subpath,
index,
config_bag)
if option.impl_is_optiondescription():
if returns_option is True:
return option
return SubConfig(option,
self._impl_context,
config_bag,
subpath)
if option.impl_is_master_slaves('slave'):
if index is None and not iter_slave:
raise IndexError(_('index is mandatory for the slave "{}"'
'').format(subpath))
length = self.cfgimpl_get_length()
if index is not None and index >= length:
raise SlaveError(_('index "{}" is higher than the master length "{}" '
'for option "{}"').format(index,
length,
option.impl_get_display_name()))
slave_len = self.cfgimpl_get_values()._p_.get_max_length(subpath)
if slave_len > length:
raise SlaveError(_('slave option "{}" has higher length "{}" than the master length "{}"'
'').format(option.impl_get_display_name(),
slave_len,
length,
subpath))
elif index:
raise IndexError(_('index is forbidden for the not slave "{}"'
'').format(subpath))
if option.impl_is_master_slaves('slave') and index is None:
value = []
length = self.cfgimpl_get_length()
for idx in range(length):
config_bag.properties = None
value.append(self.getattr(name,
idx,
config_bag))
else:
value = self.cfgimpl_get_values().get_cached_value(subpath,
index,
config_bag)
if config_bag.validate_properties:
self.cfgimpl_get_settings().validate_mandatory(subpath,
index,
value,
config_bag)
#FIXME utiliser le config_bag !
if returns_option is True:
return option
return value
def find(self,
config_bag,
bytype=None,
byname=None,
byvalue=undefined,
type_='option'):
"""
finds a list of options recursively in the config
:param bytype: Option class (BoolOption, StrOption, ...)
:param byname: filter by Option.impl_getname()
:param byvalue: filter by the option's value
:returns: list of matching Option objects
"""
return self._cfgimpl_get_context()._find(bytype,
byname,
byvalue,
config_bag,
first=False,
type_=type_,
_subpath=self.cfgimpl_get_path(False))
def find_first(self,
config_bag,
bytype=None,
byname=None,
byvalue=undefined,
type_='option',
raise_if_not_found=True):
"""
finds an option recursively in the config
:param bytype: Option class (BoolOption, StrOption, ...)
:param byname: filter by Option.impl_getname()
:param byvalue: filter by the option's value
:returns: list of matching Option objects
"""
return self._cfgimpl_get_context()._find(bytype,
byname,
byvalue,
config_bag,
first=True,
type_=type_,
_subpath=self.cfgimpl_get_path(False),
raise_if_not_found=raise_if_not_found)
def _find(self,
bytype,
byname,
byvalue,
config_bag,
first,
type_='option',
_subpath=None,
raise_if_not_found=True,
only_path=undefined,
only_option=undefined):
"""
convenience method for finding an option that lives only in the subtree
:param first: return only one option if True, a list otherwise
:return: find list or an exception if nothing has been found
"""
def _filter_by_value(sconfig_bag):
if byvalue is undefined:
return True
try:
value = self.getattr(path,
None,
sconfig_bag)
except PropertiesOptionError:
return False
if isinstance(value, list):
return byvalue in value
else:
return value == byvalue
if type_ not in ('option', 'path', 'value'): # pragma: optional cover
raise ValueError(_('unknown type_ type {0}'
'for _find').format(type_))
find_results = []
# if value and/or validate_properties are set, need all avalaible option
# If first one has no good value or not good property check second one
# and so on
only_first = first is True and byvalue is undefined and \
config_bag.validate_properties is False
if only_path is not undefined:
options = [(only_path, only_option)]
else:
options = self.cfgimpl_get_description().impl_get_options_paths(bytype,
byname,
_subpath,
only_first,
config_bag)
for path, option in options:
sconfig_bag = config_bag.copy('nooption')
sconfig_bag.option = option
if not _filter_by_value(sconfig_bag):
continue
#remove option with propertyerror, ...
if config_bag.validate_properties:
try:
self.unwrap_from_path(path,
None,
config_bag)
self.cfgimpl_get_settings().validate_properties(path,
None,
config_bag)
except PropertiesOptionError:
continue
if type_ == 'value':
retval = self.getattr(path,
None,
config_bag)
elif type_ == 'path':
retval = path
elif type_ == 'option':
retval = option
if first:
return retval
else:
find_results.append(retval)
return self._find_return_results(find_results,
raise_if_not_found)
def _find_return_results(self,
find_results,
raise_if_not_found):
if find_results == []: # pragma: optional cover
if raise_if_not_found:
raise AttributeError(_("no option found in config"
" with these criteria"))
else:
return find_results
def make_dict(self,
config_bag,
flatten=False,
_currpath=None,
withoption=None,
withvalue=undefined,
fullpath=False):
"""exports the whole config into a `dict`, for example:
>>> print(cfg.make_dict())
{'od2.var4': None, 'od2.var5': None, 'od2.var6': None}
:param flatten: returns a dict(name=value) instead of a dict(path=value)
::
>>> print(cfg.make_dict(flatten=True))
{'var5': None, 'var4': None, 'var6': None}
:param withoption: returns the options that are present in the very same
`OptionDescription` than the `withoption` itself::
>>> print(cfg.make_dict(withoption='var1'))
{'od2.var4': None, 'od2.var5': None,
'od2.var6': None,
'od2.var1': u'value',
'od1.var1': None,
'od1.var3': None,
'od1.var2': None}
:param withvalue: returns the options that have the value `withvalue`
::
>>> print(c.make_dict(withoption='var1',
withvalue=u'value'))
{'od2.var4': None,
'od2.var5': None,
'od2.var6': None,
'od2.var1': u'value'}
:returns: dict of Option's name (or path) and values
"""
pathsvalues = []
if _currpath is None:
_currpath = []
if withoption is None and withvalue is not undefined: # pragma: optional cover
raise ValueError(_("make_dict can't filtering with value without "
"option"))
if withoption is not None:
context = self._cfgimpl_get_context()
for path in context._find(bytype=None,
byname=withoption,
byvalue=withvalue,
first=False,
type_='path',
_subpath=self.cfgimpl_get_path(False),
config_bag=config_bag):
path = '.'.join(path.split('.')[:-1])
opt = context.unwrap_from_path(path,
None,
config_bag)
sconfig_bag = config_bag.copy('nooption')
sconfig_bag.option = opt
mypath = self.cfgimpl_get_path()
if mypath is not None:
if mypath == path:
withoption = None
withvalue = undefined
break
else:
tmypath = mypath + '.'
if not path.startswith(tmypath): # pragma: optional cover
raise AttributeError(_('unexpected path {0}, '
'should start with {1}'
'').format(path, mypath))
path = path[len(tmypath):]
self._make_sub_dict(path,
pathsvalues,
_currpath,
flatten,
sconfig_bag,
fullpath=fullpath)
#withoption can be set to None below !
if withoption is None:
for opt in self.cfgimpl_get_description().impl_getchildren(config_bag):
sconfig_bag = config_bag.copy('nooption')
sconfig_bag.option = opt
path = opt.impl_getname()
self._make_sub_dict(path,
pathsvalues,
_currpath,
flatten,
sconfig_bag,
fullpath=fullpath)
if _currpath == []:
options = dict(pathsvalues)
return options
return pathsvalues
def _make_sub_dict(self,
name,
pathsvalues,
_currpath,
flatten,
config_bag,
fullpath=False):
try:
option = config_bag.option
if not option.impl_is_optiondescription() and option.impl_is_master_slaves('slave'):
ret = []
length = self.cfgimpl_get_length()
if length:
for idx in range(length):
config_bag.properties = None
ret.append(self.getattr(name,
idx,
config_bag))
elif config_bag.setting_properties:
path = self._get_subpath(name)
self.cfgimpl_get_settings().validate_properties(path,
None,
config_bag)
else:
ret = self.getattr(name,
None,
config_bag)
except PropertiesOptionError:
pass
else:
if option.impl_is_optiondescription():
pathsvalues += ret.make_dict(config_bag,
flatten=flatten,
_currpath=_currpath + [name],
fullpath=fullpath)
else:
if flatten:
name = option.impl_getname()
elif fullpath:
#FIXME
#root_path = self.cfgimpl_get_path()
#if root_path is None:
# name = opt.impl_getname()
#else:
# name = '.'.join([root_path, opt.impl_getname()])
name = self._get_subpath(name)
else:
name = '.'.join(_currpath + [name])
pathsvalues.append((name, ret))
def cfgimpl_get_path(self,
dyn=True):
descr = self.cfgimpl_get_description()
if not dyn and descr.impl_is_dynoptiondescription():
context_descr = self._cfgimpl_get_context().cfgimpl_get_description()
return context_descr.impl_get_path_by_opt(descr.impl_getopt())
return self._impl_path
class _CommonConfig(SubConfig):
"abstract base class for the Config, GroupConfig and the MetaConfig"
__slots__ = ('_impl_values',
'_impl_settings',
'_impl_meta',
'_impl_test')
def _impl_build_all_caches(self,
force_store_values):
descr = self.cfgimpl_get_description()
if not descr.impl_already_build_caches():
descr._build_cache_option()
descr._build_cache(self)
descr.impl_build_force_store_values(self,
force_store_values)
def unwrap_from_path(self,
path,
index,
config_bag):
"""convenience method to extract and Option() object from the Config()
and it is **fast**: finds the option directly in the appropriate
namespace
:returns: Option()
"""
if '.' in path:
self, path = self.cfgimpl_get_home_by_path(path,
config_bag)
option = self.cfgimpl_get_description().impl_getchild(path,
config_bag,
self)
if not config_bag.validate_properties:
return option
else:
if option.impl_is_symlinkoption():
true_option = option.impl_getopt()
true_path = true_option.impl_getpath(self._cfgimpl_get_context())
self, path = self.cfgimpl_get_context().cfgimpl_get_home_by_path(true_path,
config_bag)
config_bag.option = true_option
else:
true_path = path
config_bag.option = option
#if not option.impl_is_optiondescription() and index is None and \
# config_bag.option.impl_is_master_slaves('slave'):
# subpath = self._get_subpath(true_path)
# self.cfgimpl_get_settings().validate_properties(subpath,
# index,
# config_bag)
# return option
#self.getattr(path,
# index,
# config_bag,
# returns_option=True)
return option
def cfgimpl_get_path(self, dyn=True):
return None
def cfgimpl_get_meta(self):
if self._impl_meta is not None:
return self._impl_meta()
# information
def impl_set_information(self, key, value):
"""updates the information's attribute
:param key: information's key (ex: "help", "doc"
:param value: information's value (ex: "the help string")
"""
self._impl_values.set_information(key, value)
def impl_get_information(self, key, default=undefined):
"""retrieves one information's item
:param key: the item string (ex: "help")
"""
return self._impl_values.get_information(key, default)
def impl_del_information(self, key, raises=True):
self._impl_values.del_information(key, raises)
def __getstate__(self):
raise NotImplementedError()
def _gen_fake_values(self):
fake_config = Config(self._impl_descr,
persistent=False,
force_values=get_default_values_storages(),
force_settings=self.cfgimpl_get_settings())
fake_config.cfgimpl_get_values()._p_.importation(self.cfgimpl_get_values()._p_.exportation(fake=True))
return fake_config
def duplicate(self,
session_id=None,
force_values=None,
force_settings=None):
config = Config(self._impl_descr,
_duplicate=True,
session_id=session_id,
force_values=force_values,
force_settings=force_settings)
config.cfgimpl_get_values()._p_.importation(self.cfgimpl_get_values()._p_.exportation())
config.cfgimpl_get_settings()._p_.set_modified_properties(self.cfgimpl_get_settings(
)._p_.get_modified_properties())
config.cfgimpl_get_settings()._pp_.set_modified_permissives(self.cfgimpl_get_settings(
)._pp_.get_modified_permissives())
return config
# ____________________________________________________________
class Config(_CommonConfig):
"main configuration management entry"
__slots__ = ('__weakref__', '_impl_test', '_impl_name')
def __init__(self,
descr,
session_id=None,
persistent=False,
force_values=None,
force_settings=None,
_duplicate=False,
_force_store_values=True):
""" Configuration option management master class
:param descr: describes the configuration schema
:type descr: an instance of ``option.OptionDescription``
:param context: the current root config
:type context: `Config`
:param session_id: session ID is import with persistent Config to
retrieve good session
:type session_id: `str`
:param persistent: if persistent, don't delete storage when leaving
:type persistent: `boolean`
"""
self._impl_meta = None
if isinstance(descr, MasterSlaves):
raise ConfigError(_('cannot set MasterSlaves object has root optiondescription'))
if force_settings is not None and force_values is not None:
if isinstance(force_settings, tuple):
self._impl_settings = Settings(self,
force_settings[0],
force_settings[1])
else:
self._impl_settings = force_settings
self._impl_values = Values(self,
force_values)
else:
properties, permissives, values, session_id = get_storages(self,
session_id,
persistent)
if not valid_name(session_id): # pragma: optional cover
raise ValueError(_("invalid session ID: {0} for config").format(session_id))
self._impl_settings = Settings(self,
properties,
permissives)
self._impl_values = Values(self,
values)
super(Config, self).__init__(descr,
weakref.ref(self),
ConfigBag(self),
None)
#undocumented option used only in test script
self._impl_test = False
if _duplicate is False and (force_settings is None or force_values is None):
self._impl_build_all_caches(_force_store_values)
self._impl_name = session_id
def impl_getname(self):
return self._impl_name
def impl_getsessionid(self):
return self._impl_values._p_._storage.session_id
class GroupConfig(_CommonConfig):
__slots__ = ('__weakref__',
'_impl_children',
'_impl_name')
def __init__(self,
children,
session_id=None,
persistent=False,
_descr=None):
if not isinstance(children, list):
raise ValueError(_("groupconfig's children must be a list"))
names = []
for child in children:
if not isinstance(child,
_CommonConfig):
raise ValueError(_("groupconfig's children must be Config, MetaConfig or GroupConfig"))
name_ = child._impl_name
names.append(name_)
if len(names) != len(set(names)):
for idx in xrange(1, len(names) + 1):
name = names.pop(0)
if name in names:
raise ConflictError(_('config name must be uniq in '
'groupconfig for {0}').format(name))
self._impl_children = children
properties, permissives, values, session_id = get_storages(self,
session_id,
persistent)
self._impl_settings = Settings(self,
properties,
permissives)
self._impl_values = Values(self, values)
self._impl_meta = None
super(GroupConfig, self).__init__(_descr,
weakref.ref(self),
ConfigBag(self),
None)
#undocumented option used only in test script
self._impl_test = False
self._impl_name = session_id
def cfgimpl_get_children(self):
return self._impl_children
def cfgimpl_reset_cache(self,
only_expired=False,
opt=None,
path=None,
resetted_opts=None):
if resetted_opts is None:
resetted_opts = []
if isinstance(self, MetaConfig):
super(GroupConfig, self).cfgimpl_reset_cache(only_expired=only_expired,
opt=opt,
path=path,
resetted_opts=copy(resetted_opts))
for child in self._impl_children:
child.cfgimpl_reset_cache(only_expired=only_expired,
opt=opt,
path=path,
resetted_opts=copy(resetted_opts))
def set_value(self,
path,
index,
value,
config_bag,
only_config=False,
_commit=True):
"""Setattr not in current GroupConfig, but in each children
"""
ret = []
for child in self._impl_children:
try:
if isinstance(child, GroupConfig):
ret.extend(child.set_value(path,
index,
value,
config_bag,
only_config=only_config,
_commit=False))
else:
nconfig_bag = config_bag.copy('nooption')
child.setattr(path,
index,
value,
nconfig_bag,
_commit=False)
except PropertiesOptionError as err:
ret.append(PropertiesOptionError(str(err), err.proptype))
except (ValueError, SlaveError) as err:
ret.append(err)
if _commit:
self.cfgimpl_get_values()._p_.commit()
return ret
def find_firsts(self,
config_bag,
byname=None,
bypath=undefined,
byoption=undefined,
byvalue=undefined,
raise_if_not_found=True,
_sub=False):
"""Find first not in current GroupConfig, but in each children
"""
ret = []
#if MetaConfig, all children have same OptionDescription in context
#so search only one time the option for all children
if bypath is undefined and byname is not None and \
isinstance(self,
MetaConfig):
bypath = self._find(bytype=None,
byvalue=undefined,
byname=byname,
first=True,
config_bag=config_bag,
type_='path',
raise_if_not_found=raise_if_not_found)
byname = None
byoption = self.cfgimpl_get_description().impl_get_opt_by_path(bypath)
for child in self._impl_children:
nconfig_bag = config_bag.copy('nooption')
nconfig_bag.option = child
if isinstance(child, GroupConfig):
ret.extend(child.find_firsts(byname=byname,
bypath=bypath,
byoption=byoption,
byvalue=byvalue,
config_bag=config_bag,
raise_if_not_found=False,
_sub=True))
elif child._find(None,
byname,
byvalue,
first=True,
type_='path',
config_bag=config_bag,
raise_if_not_found=False,
only_path=bypath,
only_option=byoption):
ret.append(child)
if _sub:
return ret
else:
return GroupConfig(self._find_return_results(ret,
raise_if_not_found))
def impl_getname(self):
return self._impl_name
# def __str__(self):
# ret = ''
# for child in self._impl_children:
# ret += "({0})\n".format(child._impl_name)
# if self._impl_descr is not None:
# ret += super(GroupConfig, self).__str__()
# return ret
#
# __repr__ = __str__
def getconfig(self,
name):
for child in self._impl_children:
if name == child.impl_getname():
return child
raise ConfigError(_('unknown config {}').format(name))
class MetaConfig(GroupConfig):
__slots__ = tuple()
def __init__(self,
children,
session_id=None,
persistent=False,
optiondescription=None,
_force_store_values=True):
descr = None
if optiondescription is not None:
new_children = []
for child_session_id in children:
new_children.append(Config(optiondescription,
persistent=persistent,
session_id=child_session_id,
_force_store_values=_force_store_values))
children = new_children
for child in children:
if not isinstance(child, _CommonConfig):
raise TypeError(_("metaconfig's children "
"should be config, not {0}"
).format(type(child)))
if child.cfgimpl_get_meta() is not None:
raise ValueError(_("child has already a metaconfig's"))
if descr is None:
descr = child.cfgimpl_get_description()
elif not descr is child.cfgimpl_get_description():
raise ValueError(_('all config in metaconfig must '
'have the same optiondescription'))
child._impl_meta = weakref.ref(self)
super(MetaConfig, self).__init__(children,
session_id,
persistent,
descr)
def set_value(self,
path,
index,
value,
config_bag,
force_default=False,
force_dont_change_value=False,
force_default_if_same=False,
only_config=False,
_commit=True):
"""only_config: could be set if you want modify value in all Config included in
this MetaConfig
"""
if only_config:
if force_default or force_default_if_same or force_dont_change_value:
raise ValueError(_('force_default, force_default_if_same or '
'force_dont_change_value cannot be set with'
' only_config'))
return super(MetaConfig, self).set_value(path,
index,
value,
config_bag,
only_config=only_config,
_commit=_commit)
ret = []
if force_default or force_default_if_same or force_dont_change_value:
if force_default and force_dont_change_value:
raise ValueError(_('force_default and force_dont_change_value'
' cannot be set together'))
opt = self.cfgimpl_get_description().impl_get_opt_by_path(path)
for child in self._impl_children:
nconfig_bag = config_bag.copy('nooption')
nconfig_bag.option = opt
if force_default_if_same:
if not child.cfgimpl_get_values()._p_.hasvalue(path):
child_value = undefined
else:
child_value = child.getattr(path,
None,
nconfig_bag)
if force_default or (force_default_if_same and value == child_value):
child.cfgimpl_get_values().reset(path,
nconfig_bag,
_commit=False)
continue
if force_dont_change_value:
try:
child_value = child.getattr(path,
None,
nconfig_bag)
if value != child_value:
child.setattr(path,
None,
child_value,
nconfig_bag,
_commit=False)
except (PropertiesOptionError, ValueError, SlaveError) as err:
ret.append(err)
nconfig_bag = config_bag.copy('nooption')
try:
self.setattr(path,
index,
value,
nconfig_bag,
_commit=_commit)
except (PropertiesOptionError, ValueError, SlaveError) as err:
ret.append(err)
return ret
def reset(self, path, config_bag):
#FIXME not working with DynSymLinkOption
#FIXME fonctionne avec sous metaconfig ??
opt = self.cfgimpl_get_description().impl_get_opt_by_path(path)
config_bag.option = opt
config_bag.validate = False
for child in self._impl_children:
sconfig_bag = config_bag.copy('nooption')
sconfig_bag.option = opt
child.cfgimpl_get_values().reset(path,
sconfig_bag,
_commit=False)
self.cfgimpl_get_values().reset(path,
config_bag)
def new_config(self,
session_id,
persistent=False):
config = Config(self._impl_descr,
session_id=session_id,
persistent=persistent)
if config._impl_name in [child._impl_name for child in self._impl_children]: # pragma: no cover
raise ConflictError(_('config name must be uniq in '
'groupconfig for {0}').format(config._impl_name))
config._impl_meta = weakref.ref(self)
self._impl_children.append(config)
return config