# -*- coding: utf-8 -*- # Copyright (C) 2012-2017 Team tiramisu (see AUTHORS for all contributors) # # This program is free software: you can redistribute it and/or modify it # under the terms of the GNU Lesser General Public License as published by the # Free Software Foundation, either version 3 of the License, or (at your # option) any later version. # # This program is distributed in the hope that it will be useful, but WITHOUT # ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS # FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more # details. # # You should have received a copy of the GNU Lesser General Public License # along with this program. If not, see . # # The original `Config` design model is unproudly borrowed from # the rough pypy's guys: http://codespeak.net/svn/pypy/dist/pypy/config/ # the whole pypy projet is under MIT licence # ____________________________________________________________ "options handler global entry point" import weakref from time import time from copy import copy from .error import PropertiesOptionError, ConfigError, ConflictError, SlaveError from .option.syndynoptiondescription import SynDynOptionDescription from .option.baseoption import BaseOption, valid_name from .setting import ConfigBag, groups, Settings, undefined from .storage import get_storages, get_default_values_storages from .value import Values # , Multi from .i18n import _ class SubConfig(object): """Sub configuration management entry. Tree if OptionDescription's responsability. SubConfig are generated on-demand. A Config is also a SubConfig. Root Config is call context below """ __slots__ = ('_impl_context', '_impl_descr', '_impl_path', '_impl_length') def __init__(self, descr, context, config_bag, subpath=None): """ Configuration option management master class :param descr: describes the configuration schema :type descr: an instance of ``option.OptionDescription`` :param context: the current root config :type context: `Config` :type subpath: `str` with the path name """ # main option description error = False if descr is not None and (not isinstance(descr, (BaseOption, SynDynOptionDescription)) or not descr.impl_is_optiondescription()): error = True if error: raise TypeError(_('descr must be an optiondescription, not {0}' ).format(type(descr))) self._impl_descr = descr # sub option descriptions if not isinstance(context, weakref.ReferenceType): # pragma: optional cover raise ValueError('context must be a Weakref') self._impl_context = context self._impl_path = subpath if config_bag.setting_properties is not None and \ descr.impl_get_group_type() == groups.master: master = descr.getmaster() masterpath = master.impl_getname() mconfig_bag = config_bag.copy('nooption') mconfig_bag.option = master value = self.getattr(masterpath, None, mconfig_bag) self._impl_length = len(value) def cfgimpl_get_length(self): return getattr(self, '_impl_length') def reset_one_option_cache(self, values, settings, resetted_opts, opt, path): tresetted_opts = copy(resetted_opts) opt.reset_cache(opt, path, values, 'values', tresetted_opts) tresetted_opts = copy(resetted_opts) opt.reset_cache(opt, path, settings, 'settings', tresetted_opts) resetted_opts.extend(tresetted_opts) for woption in opt._get_dependencies(self): option = woption() if option in resetted_opts: continue option_path = option.impl_getpath(self) self.reset_one_option_cache(values, settings, resetted_opts, option, option_path) del option def cfgimpl_reset_cache(self, only_expired=False, opt=None, path=None, resetted_opts=None): """reset all settings in cache :param only_expired: if True reset only expired cached values :type only_expired: boolean """ if resetted_opts is None: resetted_opts = [] context = self._cfgimpl_get_context() values = context.cfgimpl_get_values() settings = context.cfgimpl_get_settings() if not None in (opt, path): if opt not in resetted_opts: self.reset_one_option_cache(values, settings, resetted_opts, opt, path) elif only_expired: # reset cache for expired cache value ony datetime = int(time()) values._p_.reset_expired_cache(datetime) settings._p_.reset_expired_cache(datetime) else: values._p_.reset_all_cache() settings._p_.reset_all_cache() def cfgimpl_get_home_by_path(self, path, config_bag): """:returns: tuple (config, name)""" path = path.split('.') for step in path[:-1]: sconfig_bag = config_bag.copy('nooption') self = self.getattr(step, None, sconfig_bag) return self, path[-1] # ______________________________________________________________________ # def __iter__(self, force_permissive=False): # """Pythonesque way of parsing group's ordered options. # iteration only on Options (not OptionDescriptions)""" # setting_properties = self.cfgimpl_get_context().cfgimpl_get_settings().get_context_properties() # for child in self.cfgimpl_get_description().impl_getchildren(context=self._cfgimpl_get_context()): # if not child.impl_is_optiondescription(): # try: # name = child.impl_getname() # yield name, self.getattr(name, # force_permissive=force_permissive, # setting_properties=setting_properties) # except GeneratorExit: # pragma: optional cover # if sys.version_info[0] < 3: # raise StopIteration # else: # raise GeneratorExit() # except PropertiesOptionError: # pragma: optional cover # pass # option with properties # # def iter_all(self, force_permissive=False): # """A way of parsing options **and** groups. # iteration on Options and OptionDescriptions.""" # for child in self.cfgimpl_get_description().impl_getchildren(): # try: # yield child.impl_getname(), self.getattr(child.impl_getname(), # force_permissive=force_permissive) # except GeneratorExit: # pragma: optional cover # if sys.version_info[0] < 3: # raise StopIteration # else: # raise GeneratorExit() # except PropertiesOptionError: # pragma: optional cover # pass # option with properties def iter_groups(self, setting_properties, group_type=None, force_permissive=False): """iteration on groups objects only. All groups are returned if `group_type` is `None`, otherwise the groups can be filtered by categories (families, or whatever). :param group_type: if defined, is an instance of `groups.GroupType` or `groups.MasterGroupType` that lives in `setting.groups` """ if group_type is not None and not isinstance(group_type, groups.GroupType): # pragma: optional cover raise TypeError(_("unknown group_type: {0}").format(group_type)) context = self._cfgimpl_get_context() for child in self.cfgimpl_get_description().impl_getchildren(config_bag): if child.impl_is_optiondescription(): try: if group_type is None or (group_type is not None and child.impl_get_group_type() == group_type): name = child.impl_getname() yield name, self.getattr(name, force_permissive=force_permissive, setting_properties=setting_properties) except PropertiesOptionError: # pragma: optional cover pass # ______________________________________________________________________ # def __str__(self): # "Config's string representation" # lines = [] # for name, grp in self.iter_groups(): # lines.append("[{0}]".format(name)) # for name, value in self: # try: # lines.append("{0} = {1}".format(name, value)) # except UnicodeEncodeError: # pragma: optional cover # lines.append("{0} = {1}".format(name, # value.encode(default_encoding))) # return '\n'.join(lines) # # __repr__ = __str__ def _cfgimpl_get_context(self): """context could be None, we need to test it context is None only if all reference to `Config` object is deleted (for example we delete a `Config` and we manipulate a reference to old `SubConfig`, `Values`, `Multi` or `Settings`) """ context = self._impl_context() if context is None: # pragma: optional cover raise ConfigError(_('the context does not exist anymore')) return context def cfgimpl_get_context(self): return self._cfgimpl_get_context() def cfgimpl_get_description(self): if self._impl_descr is None: # pragma: optional cover raise ConfigError(_('no option description found for this config' ' (may be GroupConfig)')) else: return self._impl_descr def cfgimpl_get_settings(self): return self._cfgimpl_get_context()._impl_settings def cfgimpl_get_values(self): return self._cfgimpl_get_context()._impl_values def setattr(self, name, index, value, config_bag, _commit=True): if name.startswith('_impl_'): return object.__setattr__(self, name, value) context = self._cfgimpl_get_context() if '.' in name: # pragma: optional cover self, name = self.cfgimpl_get_home_by_path(name, config_bag) child = self.cfgimpl_get_description().impl_getchild(name, config_bag, self) if child.impl_is_optiondescription() or isinstance(child, SynDynOptionDescription): raise TypeError(_("can't assign to an OptionDescription")) # pragma: optional cover elif child.impl_is_symlinkoption(): raise TypeError(_("can't assign to a SymLinkOption")) else: subpath = self._get_subpath(name) if config_bag.setting_properties: self.cfgimpl_get_settings().validate_properties(subpath, index, config_bag) self.cfgimpl_get_description().impl_validate_value(child, value, self) return self.cfgimpl_get_values().setvalue(subpath, index, value, config_bag, _commit) def delattr(self, name, index, config_bag): context = self._cfgimpl_get_context() if '.' in name: # pragma: optional cover self, name = self.cfgimpl_get_home_by_path(name, config_bag) if config_bag.option is None: config_bag.option = self.cfgimpl_get_description().impl_getchild(name, config_bag, self) option = config_bag.option if option.impl_is_optiondescription() or isinstance(option, SynDynOptionDescription): raise TypeError(_("can't delete an OptionDescription")) # pragma: optional cover elif option.impl_is_symlinkoption(): raise TypeError(_("can't delete a SymLinkOption")) subpath = self._get_subpath(name) values = self.cfgimpl_get_values() if index is not None: if option.impl_is_master_slaves('master'): values.reset_master(self, subpath, index, config_bag) elif option.impl_is_master_slaves('slave'): values.reset_slave(subpath, index, config_bag) else: raise ValueError(_("can delete value with index only with a master or a slave")) else: values.reset(subpath, config_bag) def _get_subpath(self, name): if self._impl_path is None: subpath = name else: subpath = self._impl_path + '.' + name return subpath def getattr(self, name, index, config_bag, returns_option=False): """ attribute notation mechanism for accessing the value of an option :param name: attribute name :return: option's value if name is an option name, OptionDescription otherwise """ if '.' in name: self, name = self.cfgimpl_get_home_by_path(name, config_bag) context = self._cfgimpl_get_context() option = config_bag.option if option is None: option = self.cfgimpl_get_description().impl_getchild(name, config_bag, self) config_bag.option = option if option.impl_is_symlinkoption(): if returns_option is True: return option opt = option.impl_getopt() path = context.cfgimpl_get_description().impl_get_path_by_opt(opt) sconfig_bag = config_bag.copy('nooption') sconfig_bag.ori_option = option sconfig_bag.option = opt return context.getattr(path, index, sconfig_bag) subpath = self._get_subpath(name) if config_bag.setting_properties: self.cfgimpl_get_settings().validate_properties(subpath, index, config_bag) if option.impl_is_optiondescription(): if returns_option is True: return option return SubConfig(option, self._impl_context, config_bag, subpath) if option.impl_is_master_slaves('slave'): if index is None: raise IndexError(_('index is mandatory for the slave "{}"' '').format(subpath)) length = self.cfgimpl_get_length() if index >= length: raise IndexError(_('index ({}) is higher than the master length ({}) ' 'for "{}"').format(index, length, option.impl_get_display_name())) slave_len = self.cfgimpl_get_values()._p_.get_max_length(subpath) if slave_len > length: raise SlaveError(_('slave option "{}" has higher length ({}) than the master length ({})' '').format(option.impl_get_display_name(), slave_len, length, subpath)) elif index: raise IndexError(_('index is forbidden for the not slave "{}"' '').format(subpath)) #FIXME deja fiat dans get_cached_value #if config_bag.validate: # option.impl_validate(context, # config_bag) value = self.cfgimpl_get_values().get_cached_value(subpath, index, config_bag) if config_bag.validate_properties: self.cfgimpl_get_settings().validate_mandatory(subpath, index, value, config_bag) #FIXME utiliser le config_bag ! if returns_option is True: return option return value def find(self, config_bag, bytype=None, byname=None, byvalue=undefined, type_='option'): """ finds a list of options recursively in the config :param bytype: Option class (BoolOption, StrOption, ...) :param byname: filter by Option.impl_getname() :param byvalue: filter by the option's value :returns: list of matching Option objects """ return self._cfgimpl_get_context()._find(bytype, byname, byvalue, config_bag, first=False, type_=type_, _subpath=self.cfgimpl_get_path(False)) def find_first(self, config_bag, bytype=None, byname=None, byvalue=undefined, type_='option', raise_if_not_found=True): """ finds an option recursively in the config :param bytype: Option class (BoolOption, StrOption, ...) :param byname: filter by Option.impl_getname() :param byvalue: filter by the option's value :returns: list of matching Option objects """ return self._cfgimpl_get_context()._find(bytype, byname, byvalue, config_bag, first=True, type_=type_, _subpath=self.cfgimpl_get_path(False), raise_if_not_found=raise_if_not_found) def _find(self, bytype, byname, byvalue, config_bag, first, type_='option', _subpath=None, raise_if_not_found=True, only_path=undefined, only_option=undefined): """ convenience method for finding an option that lives only in the subtree :param first: return only one option if True, a list otherwise :return: find list or an exception if nothing has been found """ def _filter_by_value(sconfig_bag): if byvalue is undefined: return True try: value = self.getattr(path, None, sconfig_bag) except PropertiesOptionError: return False if isinstance(value, list): return byvalue in value else: return value == byvalue if type_ not in ('option', 'path', 'value'): # pragma: optional cover raise ValueError(_('unknown type_ type {0}' 'for _find').format(type_)) find_results = [] # if value and/or validate_properties are set, need all avalaible option # If first one has no good value or not good property check second one # and so on only_first = first is True and byvalue is undefined and \ config_bag.validate_properties is False if only_path is not undefined: options = [(only_path, only_option)] else: options = self.cfgimpl_get_description().impl_get_options_paths(bytype, byname, _subpath, only_first, config_bag) for path, option in options: sconfig_bag = config_bag.copy('nooption') sconfig_bag.option = option if not _filter_by_value(sconfig_bag): continue #remove option with propertyerror, ... if config_bag.validate_properties: try: self.unwrap_from_path(path, None, config_bag) except PropertiesOptionError: continue if type_ == 'value': retval = self.getattr(path, None, config_bag) elif type_ == 'path': retval = path elif type_ == 'option': retval = option if first: return retval else: find_results.append(retval) return self._find_return_results(find_results, raise_if_not_found) def _find_return_results(self, find_results, raise_if_not_found): if find_results == []: # pragma: optional cover if raise_if_not_found: raise AttributeError(_("no option found in config" " with these criteria")) else: return find_results def make_dict(self, config_bag, flatten=False, _currpath=None, withoption=None, withvalue=undefined, fullpath=False): """exports the whole config into a `dict`, for example: >>> print(cfg.make_dict()) {'od2.var4': None, 'od2.var5': None, 'od2.var6': None} :param flatten: returns a dict(name=value) instead of a dict(path=value) :: >>> print(cfg.make_dict(flatten=True)) {'var5': None, 'var4': None, 'var6': None} :param withoption: returns the options that are present in the very same `OptionDescription` than the `withoption` itself:: >>> print(cfg.make_dict(withoption='var1')) {'od2.var4': None, 'od2.var5': None, 'od2.var6': None, 'od2.var1': u'value', 'od1.var1': None, 'od1.var3': None, 'od1.var2': None} :param withvalue: returns the options that have the value `withvalue` :: >>> print(c.make_dict(withoption='var1', withvalue=u'value')) {'od2.var4': None, 'od2.var5': None, 'od2.var6': None, 'od2.var1': u'value'} :returns: dict of Option's name (or path) and values """ pathsvalues = [] if _currpath is None: _currpath = [] if withoption is None and withvalue is not undefined: # pragma: optional cover raise ValueError(_("make_dict can't filtering with value without " "option")) if withoption is not None: context = self._cfgimpl_get_context() for path in context._find(bytype=None, byname=withoption, byvalue=withvalue, first=False, type_='path', _subpath=self.cfgimpl_get_path(False), config_bag=config_bag): path = '.'.join(path.split('.')[:-1]) opt = context.unwrap_from_path(path, None, config_bag) sconfig_bag = config_bag.copy('nooption') sconfig_bag.option = opt mypath = self.cfgimpl_get_path() if mypath is not None: if mypath == path: withoption = None withvalue = undefined break else: tmypath = mypath + '.' if not path.startswith(tmypath): # pragma: optional cover raise AttributeError(_('unexpected path {0}, ' 'should start with {1}' '').format(path, mypath)) path = path[len(tmypath):] self._make_sub_dict(path, pathsvalues, _currpath, flatten, sconfig_bag, fullpath=fullpath) #withoption can be set to None below ! if withoption is None: for opt in self.cfgimpl_get_description().impl_getchildren(config_bag): sconfig_bag = config_bag.copy('nooption') sconfig_bag.option = opt path = opt.impl_getname() self._make_sub_dict(path, pathsvalues, _currpath, flatten, sconfig_bag, fullpath=fullpath) if _currpath == []: options = dict(pathsvalues) return options return pathsvalues def _make_sub_dict(self, name, pathsvalues, _currpath, flatten, config_bag, fullpath=False): try: option = config_bag.option if not option.impl_is_optiondescription() and option.impl_is_master_slaves('slave'): ret = [] length = self.cfgimpl_get_length() if length: for idx in range(length): config_bag.properties = None ret.append(self.getattr(name, idx, config_bag)) elif config_bag.setting_properties: path = self._get_subpath(name) self.cfgimpl_get_settings().validate_properties(path, None, config_bag) else: ret = self.getattr(name, None, config_bag) except PropertiesOptionError: pass else: if option.impl_is_optiondescription(): pathsvalues += ret.make_dict(config_bag, flatten=flatten, _currpath=_currpath + [name], fullpath=fullpath) else: if flatten: name = option.impl_getname() elif fullpath: #FIXME #root_path = self.cfgimpl_get_path() #if root_path is None: # name = opt.impl_getname() #else: # name = '.'.join([root_path, opt.impl_getname()]) name = self._get_subpath(name) else: name = '.'.join(_currpath + [name]) pathsvalues.append((name, ret)) def cfgimpl_get_path(self, dyn=True): descr = self.cfgimpl_get_description() if not dyn and descr.impl_is_dynoptiondescription(): context_descr = self._cfgimpl_get_context().cfgimpl_get_description() return context_descr.impl_get_path_by_opt(descr.impl_getopt()) return self._impl_path class _CommonConfig(SubConfig): "abstract base class for the Config, GroupConfig and the MetaConfig" __slots__ = ('_impl_values', '_impl_settings', '_impl_meta', '_impl_test') def _impl_build_all_caches(self, force_store_values): descr = self.cfgimpl_get_description() if not descr.impl_already_build_caches(): descr._build_cache_option() descr._build_cache(self) descr.impl_build_force_store_values(self, force_store_values) def unwrap_from_path(self, path, index, config_bag): """convenience method to extract and Option() object from the Config() and it is **fast**: finds the option directly in the appropriate namespace :returns: Option() """ if '.' in path: self, path = self.cfgimpl_get_home_by_path(path, config_bag) option = self.cfgimpl_get_description().impl_getchild(path, config_bag, self) if not config_bag.validate_properties: return option else: if option.impl_is_symlinkoption(): true_option = option.impl_getopt() true_path = true_option.impl_getpath(self._cfgimpl_get_context()) self, path = self.cfgimpl_get_context().cfgimpl_get_home_by_path(true_path, config_bag) config_bag.option = true_option else: true_path = path config_bag.option = option #if not option.impl_is_optiondescription() and index is None and \ # config_bag.option.impl_is_master_slaves('slave'): # subpath = self._get_subpath(true_path) # self.cfgimpl_get_settings().validate_properties(subpath, # index, # config_bag) # return option #self.getattr(path, # index, # config_bag, # returns_option=True) return option def cfgimpl_get_path(self, dyn=True): return None def cfgimpl_get_meta(self): if self._impl_meta is not None: return self._impl_meta() # information def impl_set_information(self, key, value): """updates the information's attribute :param key: information's key (ex: "help", "doc" :param value: information's value (ex: "the help string") """ self._impl_values.set_information(key, value) def impl_get_information(self, key, default=undefined): """retrieves one information's item :param key: the item string (ex: "help") """ return self._impl_values.get_information(key, default) def impl_del_information(self, key, raises=True): self._impl_values.del_information(key, raises) def __getstate__(self): raise NotImplementedError() def _gen_fake_values(self): fake_config = Config(self._impl_descr, persistent=False, force_values=get_default_values_storages(), force_settings=self.cfgimpl_get_settings()) fake_config.cfgimpl_get_values()._p_.importation(self.cfgimpl_get_values()._p_.exportation(fake=True)) return fake_config def duplicate(self, session_id=None, force_values=None, force_settings=None): config = Config(self._impl_descr, _duplicate=True, session_id=session_id, force_values=force_values, force_settings=force_settings) config.cfgimpl_get_values()._p_.importation(self.cfgimpl_get_values()._p_.exportation()) config.cfgimpl_get_settings()._p_.set_modified_properties(self.cfgimpl_get_settings( )._p_.get_modified_properties()) config.cfgimpl_get_settings()._pp_.set_modified_permissives(self.cfgimpl_get_settings( )._pp_.get_modified_permissives()) return config # ____________________________________________________________ class Config(_CommonConfig): "main configuration management entry" __slots__ = ('__weakref__', '_impl_test', '_impl_name') def __init__(self, descr, session_id=None, persistent=False, force_values=None, force_settings=None, _duplicate=False, _force_store_values=True): """ Configuration option management master class :param descr: describes the configuration schema :type descr: an instance of ``option.OptionDescription`` :param context: the current root config :type context: `Config` :param session_id: session ID is import with persistent Config to retrieve good session :type session_id: `str` :param persistent: if persistent, don't delete storage when leaving :type persistent: `boolean` """ self._impl_meta = None if force_settings is not None and force_values is not None: if isinstance(force_settings, tuple): self._impl_settings = Settings(self, force_settings[0], force_settings[1]) else: self._impl_settings = force_settings self._impl_values = Values(self, force_values) else: properties, permissives, values, session_id = get_storages(self, session_id, persistent) if not valid_name(session_id): # pragma: optional cover raise ValueError(_("invalid session ID: {0} for config").format(session_id)) self._impl_settings = Settings(self, properties, permissives) self._impl_values = Values(self, values) super(Config, self).__init__(descr, weakref.ref(self), ConfigBag(self), None) #undocumented option used only in test script self._impl_test = False if _duplicate is False and (force_settings is None or force_values is None): self._impl_build_all_caches(_force_store_values) self._impl_name = session_id def impl_getname(self): return self._impl_name def impl_getsessionid(self): return self._impl_values._p_._storage.session_id class GroupConfig(_CommonConfig): __slots__ = ('__weakref__', '_impl_children', '_impl_name') def __init__(self, children, session_id=None, persistent=False, _descr=None): if not isinstance(children, list): raise ValueError(_("groupconfig's children must be a list")) names = [] for child in children: if not isinstance(child, _CommonConfig): raise ValueError(_("groupconfig's children must be Config, MetaConfig or GroupConfig")) name_ = child._impl_name names.append(name_) if len(names) != len(set(names)): for idx in xrange(1, len(names) + 1): name = names.pop(0) if name in names: raise ConflictError(_('config name must be uniq in ' 'groupconfig for {0}').format(name)) self._impl_children = children properties, permissives, values, session_id = get_storages(self, session_id, persistent) self._impl_settings = Settings(self, properties, permissives) self._impl_values = Values(self, values) super(GroupConfig, self).__init__(_descr, weakref.ref(self), undefined, True, False) self._impl_meta = None #undocumented option used only in test script self._impl_test = False self._impl_name = session_id def cfgimpl_get_children(self): return self._impl_children def cfgimpl_reset_cache(self, only_expired=False, opt=None, path=None, resetted_opts=None): if resetted_opts is None: resetted_opts = [] if isinstance(self, MetaConfig): super(GroupConfig, self).cfgimpl_reset_cache(only_expired=only_expired, opt=opt, path=path, resetted_opts=copy(resetted_opts)) for child in self._impl_children: child.cfgimpl_reset_cache(only_expired=only_expired, opt=opt, path=path, resetted_opts=copy(resetted_opts)) def set_value(self, path, value, _commit=True): """Setattr not in current GroupConfig, but in each children """ ret = [] for child in self._impl_children: if isinstance(child, MetaConfig): ret.extend(child.set_value(path, value, only_config=True, _commit=False)) elif isinstance(child, GroupConfig): ret.extend(child.set_value(path, value, _commit=False)) else: childret = child.setattr(path, value, not_raises=True, _commit=False) if childret is not None: ret.append(childret) if _commit: self.cfgimpl_get_values()._p_.commit() return ret def find_firsts(self, byname=None, bypath=undefined, byoption=undefined, byvalue=undefined, raise_if_not_found=True, _sub=False, check_properties=True): """Find first not in current GroupConfig, but in each children """ ret = [] #if MetaConfig, all children have same OptionDescription in context #so search only one time the option for all children if bypath is undefined and byname is not None and \ isinstance(self, MetaConfig): bypath = self._find(bytype=None, byvalue=undefined, byname=byname, first=True, type_='path', check_properties=None, raise_if_not_found=raise_if_not_found) byname = None byoption = self.cfgimpl_get_description().impl_get_opt_by_path(bypath) for child in self._impl_children: if isinstance(child, GroupConfig): ret.extend(child.find_firsts(byname=byname, bypath=bypath, byoption=byoption, byvalue=byvalue, check_properties=check_properties, raise_if_not_found=False, _sub=True)) elif child._find(None, byname, byvalue, first=True, type_='path', raise_if_not_found=False, check_properties=check_properties, only_path=bypath, only_option=byoption): ret.append(child) if _sub: return ret else: return GroupConfig(self._find_return_results(ret, raise_if_not_found)) def __str__(self): ret = '' for child in self._impl_children: ret += "({0})\n".format(child._impl_name) if self._impl_descr is not None: ret += super(GroupConfig, self).__str__() return ret __repr__ = __str__ def getconfig(self, name): for child in self._impl_children: if name == child.impl_getname(): return child raise ConfigError(_('unknown config {}').format(name)) class MetaConfig(GroupConfig): __slots__ = tuple() def __init__(self, children, session_id=None, persistent=False, optiondescription=None, _force_store_values=True): descr = None if optiondescription is not None: new_children = [] for child_session_id in children: new_children.append(Config(optiondescription, persistent=persistent, session_id=child_session_id, _force_store_values=_force_store_values)) children = new_children for child in children: if not isinstance(child, _CommonConfig): raise TypeError(_("metaconfig's children " "should be config, not {0}" ).format(type(child))) if child.cfgimpl_get_meta() is not None: raise ValueError(_("child has already a metaconfig's")) if descr is None: descr = child.cfgimpl_get_description() elif not descr is child.cfgimpl_get_description(): raise ValueError(_('all config in metaconfig must ' 'have the same optiondescription')) child._impl_meta = weakref.ref(self) super(MetaConfig, self).__init__(children, session_id, persistent, descr) def set_value(self, path, value, force_default=False, force_dont_change_value=False, force_default_if_same=False, only_config=False, _commit=True): """only_config: could be set if you want modify value in all Config included in this MetaConfig """ if only_config: if force_default or force_default_if_same or force_dont_change_value: raise ValueError(_('force_default, force_default_if_same or ' 'force_dont_change_value cannot be set with' ' only_config')) return super(MetaConfig, self).set_value(path, value, _commit=_commit) ret = [] if force_default or force_default_if_same or force_dont_change_value: if force_default and force_dont_change_value: raise ValueError(_('force_default and force_dont_change_value' ' cannot be set together')) opt = self.cfgimpl_get_description().impl_get_opt_by_path(path) setting_properties = self.cfgimpl_get_settings()._getproperties(read_write=False) for child in self._impl_children: if force_default_if_same or force_default: if force_default_if_same: if not child.cfgimpl_get_values()._contains(path): child_value = undefined else: child_value = child.getattr(path) if force_default or value == child_value: child.cfgimpl_get_values().reset(opt, path, setting_properties, validate=False, _commit=False) continue if force_dont_change_value: child_value = child.getattr(path, setting_properties=setting_properties) if isinstance(child_value, Exception): ret.append(child_value) elif value != child_value: childret = child.setattr(path, child_value, _commit=False, not_raises=True) if childret is not None: # pragma: no cover ret.append(childret) self.setattr(path, value, _commit=_commit) return ret def reset(self, path): opt = self.cfgimpl_get_description().impl_get_opt_by_path(path) setting_properties = self.cfgimpl_get_settings()._getproperties(read_write=False) for child in self._impl_children: child.cfgimpl_get_values().reset(opt, path, setting_properties, validate=False, _commit=False) self.cfgimpl_get_values().reset(opt, path, setting_properties, validate=False) def new_config(self, session_id, persistent=False): config = Config(self._impl_descr, session_id=session_id, persistent=persistent) if config._impl_name in [child._impl_name for child in self._impl_children]: # pragma: no cover raise ConflictError(_('config name must be uniq in ' 'groupconfig for {0}').format(config._impl_name)) config._impl_meta = weakref.ref(self) self._impl_children.append(config) return config