# -*- coding: utf-8 -*- # Copyright (C) 2012-2019 Team tiramisu (see AUTHORS for all contributors) # # This program is free software: you can redistribute it and/or modify it # under the terms of the GNU Lesser General Public License as published by the # Free Software Foundation, either version 3 of the License, or (at your # option) any later version. # # This program is distributed in the hope that it will be useful, but WITHOUT # ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS # FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more # details. # # You should have received a copy of the GNU Lesser General Public License # along with this program. If not, see . # # The original `Config` design model is unproudly borrowed from # the rough pypy's guys: http://codespeak.net/svn/pypy/dist/pypy/config/ # the whole pypy projet is under MIT licence # ____________________________________________________________ "options handler global entry point" import weakref from copy import copy from .error import PropertiesOptionError, ConfigError, ConflictError, \ LeadershipError from .option import SynDynOptionDescription, DynOptionDescription, Leadership from .option.baseoption import BaseOption, valid_name from .setting import OptionBag, ConfigBag, Settings, undefined, groups from .storage import get_storages, gen_storage_id, get_default_values_storages, list_sessions, Cache from .value import Values from .i18n import _ class SubConfig(object): """Sub configuration management entry. Tree if OptionDescription's responsability. SubConfig are generated on-demand. A Config is also a SubConfig. Root Config is call context below """ __slots__ = ('_impl_context', '_impl_descr', '_impl_path', '_impl_length') def __init__(self, descr, context, config_bag, subpath=None): """ Configuration option management class :param descr: describes the configuration schema :type descr: an instance of ``option.OptionDescription`` :param context: the current root config :type context: `Config` :type subpath: `str` with the path name """ # main option description if __debug__ and descr is not None and \ (not isinstance(descr, (BaseOption, SynDynOptionDescription)) or not descr.impl_is_optiondescription()): try: msg = descr.impl_get_displayname() except AttributeError: msg = descr raise TypeError(_('"{0}" must be an optiondescription, not an {1}' ).format(msg, type(descr))) self._impl_descr = descr self._impl_context = context self._impl_path = subpath if descr is not None and descr.impl_is_leadership(): leader = descr.get_leader() leaderpath = leader.impl_getname() full_leaderpath = self._get_subpath(leaderpath) cconfig_bag = config_bag.copy() cconfig_bag.remove_validation() moption_bag = OptionBag() moption_bag.set_option(leader, full_leaderpath, None, cconfig_bag) value = self.getattr(leaderpath, moption_bag) self._impl_length = len(value) def cfgimpl_get_length(self): return self._impl_length def cfgimpl_get_length_leadership(self, option_bag): if option_bag.option.impl_is_symlinkoption(): context = self.cfgimpl_get_context() path = option_bag.option.impl_getopt().impl_getpath() subconfig, _ = context.cfgimpl_get_home_by_path(path, option_bag.config_bag) return subconfig.cfgimpl_get_length() else: return self.cfgimpl_get_length() def reset_one_option_cache(self, desc, resetted_opts, option_bag): if option_bag.path in resetted_opts: return resetted_opts.append(option_bag.path) for woption in option_bag.option._get_dependencies(self.cfgimpl_get_description()): option = woption() if option.impl_is_dynoptiondescription(): subpath = option.impl_getpath().rsplit('.', 1)[0] for suffix in option.get_suffixes(option_bag.config_bag): doption = option.to_dynoption(subpath, suffix) doption_path = doption.impl_getpath() doption_bag = OptionBag() doption_bag.set_option(doption, doption_path, option_bag.index, option_bag.config_bag) self.reset_one_option_cache(desc, resetted_opts, doption_bag) elif option.issubdyn(): dynopt = option.getsubdyn() rootpath = dynopt.impl_getpath() subpaths = [rootpath] + option.impl_getpath()[len(rootpath) + 1:].split('.')[:-1] for suffix in dynopt.get_suffixes(option_bag.config_bag): subpath = '.'.join([subp + suffix for subp in subpaths]) doption = option.to_dynoption(subpath, suffix) doption_path = doption.impl_getpath() doption_bag = OptionBag() doption_bag.set_option(doption, doption_path, option_bag.index, option_bag.config_bag) self.reset_one_option_cache(desc, resetted_opts, doption_bag) else: option_path = option.impl_getpath() doption_bag = OptionBag() doption_bag.set_option(option, option_path, option_bag.index, option_bag.config_bag) self.reset_one_option_cache(desc, resetted_opts, doption_bag) del option option_bag.option.reset_cache(option_bag.path, option_bag.config_bag, resetted_opts) def cfgimpl_reset_cache(self, option_bag, resetted_opts=None): """reset all settings in cache """ if resetted_opts is None: resetted_opts = [] context = self.cfgimpl_get_context() desc = context.cfgimpl_get_description() if option_bag is not None: self.reset_one_option_cache(desc, resetted_opts, option_bag) else: context._impl_values_cache.reset_all_cache() context._impl_properties_cache.reset_all_cache() def cfgimpl_get_home_by_path(self, path, config_bag): """:returns: tuple (config, name)""" path = path.split('.') for step in path[:-1]: option_bag = OptionBag() option = self.cfgimpl_get_description().get_child(step, config_bag, self.cfgimpl_get_path()) subpath = self._get_subpath(step) option_bag.set_option(option, subpath, None, config_bag) self = self.get_subconfig(option_bag) assert isinstance(self, SubConfig), _('unknown option {}').format(path[-1]) return self, path[-1] # ______________________________________________________________________ def cfgimpl_get_context(self): return self._impl_context() def cfgimpl_get_description(self): assert self._impl_descr is not None, _('there is no option description for this config' ' (may be GroupConfig)') return self._impl_descr def cfgimpl_get_settings(self): return self.cfgimpl_get_context()._impl_settings def cfgimpl_get_values(self): return self.cfgimpl_get_context()._impl_values def setattr(self, value, option_bag, _commit=True): if option_bag.option.impl_is_symlinkoption(): raise ConfigError(_("can't set value to a SymLinkOption")) context = option_bag.config_bag.context context.cfgimpl_get_settings().validate_properties(option_bag) if option_bag.option.impl_is_leader() and len(value) < self._impl_length: raise LeadershipError(_('cannot reduce length of the leader "{}"' '').format(option_bag.option.impl_get_display_name())) return context.cfgimpl_get_values().setvalue(value, option_bag, _commit) def delattr(self, option_bag, _commit=True): option = option_bag.option if option.impl_is_symlinkoption(): raise ConfigError(_("can't delete a SymLinkOption")) values = self.cfgimpl_get_values() if option_bag.index is not None: values.reset_follower(option_bag, _commit) else: values.reset(option_bag, _commit) def _get_subpath(self, name): if self._impl_path is None: subpath = name else: subpath = self._impl_path + '.' + name return subpath def get_subconfig(self, option_bag): self.cfgimpl_get_settings().validate_properties(option_bag) return SubConfig(option_bag.option, self._impl_context, option_bag.config_bag, option_bag.path) def getattr(self, name, option_bag, from_follower=False, needs_re_verify_follower_properties=False): """ :return: option's value if name is an option name, OptionDescription otherwise """ config_bag = option_bag.config_bag if '.' in name: self, name = self.cfgimpl_get_home_by_path(name, config_bag) option = option_bag.option if option.impl_is_symlinkoption(): soption_bag = OptionBag() soption_bag.set_option(option.impl_getopt(), None, option_bag.index, config_bag) soption_bag.ori_option = option context = self.cfgimpl_get_context() return context.getattr(soption_bag.path, soption_bag) #if not from_follower or needs_re_verify_follower_properties: if option.impl_is_follower() and not from_follower: needs_re_verify_follower_properties = self.cfgimpl_get_settings().has_properties_index(option_bag) if not option.impl_is_follower() or \ (needs_re_verify_follower_properties and option_bag.index is not None) or \ (not needs_re_verify_follower_properties and (not from_follower or option_bag.index is None)): self.cfgimpl_get_settings().validate_properties(option_bag) if option.impl_is_follower() and not from_follower: length = self.cfgimpl_get_length_leadership(option_bag) follower_len = self.cfgimpl_get_values()._p_.get_max_length(option_bag.path) if follower_len > length: raise LeadershipError(_('the follower option "{}" has greater length ({}) than the leader ' 'length ({})').format(option.impl_get_display_name(), follower_len, length, option_bag.index)) if option.impl_is_follower() and option_bag.index is None: value = [] for idx in range(length): soption_bag = OptionBag() soption_bag.set_option(option, option_bag.path, idx, config_bag) try: value.append(self.getattr(name, soption_bag, from_follower=True, needs_re_verify_follower_properties=needs_re_verify_follower_properties)) except PropertiesOptionError as err: value.append(err) else: value = self.cfgimpl_get_values().get_cached_value(option_bag) self.cfgimpl_get_settings().validate_mandatory(value, option_bag) return value def find(self, bytype, byname, byvalue, config_bag, _subpath=None, raise_if_not_found=True, only_path=undefined, only_option=undefined, with_option=False): """ convenience method for finding an option that lives only in the subtree :param first: return only one option if True, a list otherwise :return: find list or an exception if nothing has been found """ def _filter_by_value(soption_bag): try: value = context.getattr(path, soption_bag) except PropertiesOptionError: return False if isinstance(value, list): return byvalue in value else: return value == byvalue found = False if only_path is not undefined: options = [only_option] else: options = self.cfgimpl_get_description().get_children_recursively(bytype, byname, config_bag) context = self.cfgimpl_get_context() for option in options: option_bag = OptionBag() path = option.impl_getpath() option_bag.set_option(option, path, None, config_bag) if byvalue is not undefined and not _filter_by_value(option_bag): continue elif config_bag.properties: #remove option with propertyerror, ... try: if '.' in path: subconfig, subpath = context.cfgimpl_get_home_by_path(path, config_bag) else: subconfig = self subpath = path subconfig.cfgimpl_get_description().get_child(subpath, config_bag, subconfig.cfgimpl_get_path()) self.cfgimpl_get_settings().validate_properties(option_bag) except PropertiesOptionError: continue found = True if not with_option: yield path else: yield path, option self._find_return_results(found, raise_if_not_found) def _find_return_results(self, found, raise_if_not_found): if not found and raise_if_not_found: raise AttributeError(_("no option found in config" " with these criteria")) def make_dict(self, config_bag, flatten=False, _currpath=None, withoption=None, withvalue=undefined, fullpath=False): """exports the whole config into a `dict` :returns: dict of Option's name (or path) and values """ pathsvalues = {} if _currpath is None: _currpath = [] if withoption is None and withvalue is not undefined: raise ValueError(_("make_dict can't filtering with value without " "option")) context = self.cfgimpl_get_context() self._make_dict(context, config_bag, flatten, _currpath, withoption, withvalue, fullpath, pathsvalues) return pathsvalues def _make_dict(self, context, config_bag, flatten, _currpath, withoption, withvalue, fullpath, pathsvalues): if withoption is not None: # Find all option with criteria # retrieve OptionDescription and make_dict on it mypath = self.cfgimpl_get_path() for path in context.find(bytype=None, byname=withoption, byvalue=withvalue, _subpath=self.cfgimpl_get_path(False), config_bag=config_bag): path = '.'.join(path.split('.')[:-1]) if '.' in path: subconfig, subpath = context.cfgimpl_get_home_by_path(path, config_bag) else: subconfig = context subpath = path opt = subconfig.cfgimpl_get_description().get_child(subpath, config_bag, subconfig.cfgimpl_get_path()) soption_bag = OptionBag() soption_bag.set_option(opt, path, None, config_bag) if mypath is not None: if mypath == path: withoption = None withvalue = undefined break else: tmypath = mypath + '.' assert path.startswith(tmypath), _('unexpected path "{0}", ' 'should start with "{1}"' '').format(path, mypath) path = path[len(tmypath):] self._make_sub_dict(path, pathsvalues, _currpath, flatten, soption_bag, fullpath, context, withvalue) #withoption can be set to None below ! if withoption is None: for opt in self.cfgimpl_get_description().get_children(config_bag): name = opt.impl_getname() path = self._get_subpath(name) soption_bag = OptionBag() soption_bag.set_option(opt, path, None, config_bag) self._make_sub_dict(name, pathsvalues, _currpath, flatten, soption_bag, fullpath, context, withvalue) return pathsvalues def _make_sub_dict(self, name, pathsvalues, _currpath, flatten, option_bag, fullpath, context, withvalue): option = option_bag.option if option.impl_is_optiondescription(): try: self.cfgimpl_get_settings().validate_properties(option_bag) subconfig = SubConfig(option_bag.option, self._impl_context, option_bag.config_bag, option_bag.path) subconfig._make_dict(context, option_bag.config_bag, flatten, _currpath + [name], None, withvalue, fullpath, pathsvalues) except PropertiesOptionError as err: if err.proptype in (['mandatory'], ['empty']): raise err else: try: ret = self.getattr(name, option_bag) except PropertiesOptionError as err: # import traceback # traceback.print_exc() if err.proptype in (['mandatory'], ['empty']): raise err return if flatten: name_ = option.impl_getname() elif fullpath: name_ = self._get_subpath(name) else: name_ = '.'.join(_currpath + [name]) pathsvalues[name_] = ret def cfgimpl_get_path(self, dyn=True): descr = self.cfgimpl_get_description() if not dyn and descr.impl_is_dynoptiondescription(): return descr.impl_getopt().impl_getpath() return self._impl_path class _CommonConfig(SubConfig): "abstract base class for the Config, KernelGroupConfig and the KernelMetaConfig" __slots__ = ('_impl_values', '_impl_values_cache', '_impl_settings', '_impl_properties_cache', '_impl_permissives_cache', 'parents', 'impl_type') def _impl_build_all_caches(self): descr = self.cfgimpl_get_description() if not descr.impl_already_build_caches(): descr._group_type = groups.root descr._build_cache(display_name=self._display_name) config_bag = ConfigBag(context=self) descr.impl_build_force_store_values(config_bag) def get_parents(self): for parent in self.parents: yield parent() # information def impl_set_information(self, key, value): """updates the information's attribute :param key: information's key (ex: "help", "doc" :param value: information's value (ex: "the help string") """ self._impl_values.set_information(key, value) def impl_get_information(self, key, default=undefined): """retrieves one information's item :param key: the item string (ex: "help") """ return self._impl_values.get_information(key, default) def impl_del_information(self, key, raises=True): self._impl_values.del_information(key, raises) def impl_list_information(self): return self._impl_values.list_information() def __getstate__(self): raise NotImplementedError() def _gen_fake_values(self): fake_config = KernelConfig(self._impl_descr, persistent=False, force_values=get_default_values_storages(), force_settings=self.cfgimpl_get_settings(), display_name=self._display_name) fake_config.cfgimpl_get_values()._p_.importation(self.cfgimpl_get_values()._p_.exportation()) fake_config.parents = self.parents return fake_config def duplicate(self, session_id=None, force_values=None, force_settings=None, storage=None, persistent=False, metaconfig_prefix=None, child=None, deep=None): assert isinstance(self, (KernelConfig, KernelMixConfig)), _('cannot duplicate {}').format(self.__class__.__name__) if isinstance(self, KernelConfig): duplicated_config = KernelConfig(self._impl_descr, _duplicate=True, session_id=session_id, force_values=force_values, force_settings=force_settings, persistent=persistent, storage=storage, display_name=self._display_name) else: if session_id is None and metaconfig_prefix is not None: session_id = metaconfig_prefix + self.impl_getname() duplicated_config = KernelMetaConfig([], _duplicate=True, optiondescription=self._impl_descr, session_id=session_id, persistent=persistent, storage=storage, display_name=self._display_name) duplicated_config.cfgimpl_get_values()._p_.importation(self.cfgimpl_get_values()._p_.exportation()) properties = self.cfgimpl_get_settings()._p_.exportation() duplicated_config.cfgimpl_get_settings()._p_.importation(properties) duplicated_config.cfgimpl_get_settings()._pp_.importation(self.cfgimpl_get_settings( )._pp_.exportation()) duplicated_config.cfgimpl_get_settings().ro_append = self.cfgimpl_get_settings().ro_append duplicated_config.cfgimpl_get_settings().rw_append = self.cfgimpl_get_settings().rw_append duplicated_config.cfgimpl_get_settings().ro_remove = self.cfgimpl_get_settings().ro_remove duplicated_config.cfgimpl_get_settings().rw_remove = self.cfgimpl_get_settings().rw_remove duplicated_config.cfgimpl_get_settings().default_properties = self.cfgimpl_get_settings().default_properties duplicated_config.cfgimpl_reset_cache(None, None) if child is not None: duplicated_config._impl_children.append(child) child.parents.append(weakref.ref(duplicated_config)) if self.parents: if deep is not None: for parent in self.parents: wparent = parent() if wparent not in deep: deep.append(wparent) duplicated_config = wparent.duplicate(deep=deep, storage=storage, metaconfig_prefix=metaconfig_prefix, child=duplicated_config, persistent=persistent) else: duplicated_config.parents = self.parents for parent in self.parents: parent()._impl_children.append(duplicated_config) return duplicated_config def cfgimpl_get_config_path(self): path = self._impl_name for parent in self.parents: wparent = parent() if wparent is None: raise ConfigError(_('parent of {} not already exists').format(self._impl_name)) path = parent().cfgimpl_get_config_path() + '.' + path return path # ____________________________________________________________ class KernelConfig(_CommonConfig): "main configuration management entry" __slots__ = ('__weakref__', '_impl_name', '_display_name', '_impl_symlink') impl_type = 'config' def __init__(self, descr, session_id=None, persistent=False, force_values=None, force_settings=None, display_name=None, _duplicate=False, storage=None): """ Configuration option management class :param descr: describes the configuration schema :type descr: an instance of ``option.OptionDescription`` :param context: the current root config :type context: `Config` :param session_id: session ID is import with persistent Config to retrieve good session :type session_id: `str` :param persistent: if persistent, don't delete storage when leaving :type persistent: `boolean` """ self.parents = [] self._impl_symlink = [] self._display_name = display_name if isinstance(descr, Leadership): raise ConfigError(_('cannot set leadership object has root optiondescription')) if isinstance(descr, DynOptionDescription): raise ConfigError(_('cannot set dynoptiondescription object has root optiondescription')) if force_settings is not None and force_values is not None: if isinstance(force_settings, tuple): self._impl_settings = Settings(force_settings[0], force_settings[1]) else: self._impl_settings = force_settings self._impl_permissives_cache = Cache() self._impl_properties_cache = Cache() self._impl_values = Values(force_values) self._impl_values_cache = Cache() else: properties, permissives, values, session_id = get_storages(self, session_id, persistent, storage=storage) if not valid_name(session_id): raise ValueError(_("invalid session ID: {0} for config").format(session_id)) self._impl_settings = Settings(properties, permissives) self._impl_permissives_cache = Cache() self._impl_properties_cache = Cache() self._impl_values = Values(values) self._impl_values_cache = Cache() super().__init__(descr, weakref.ref(self), ConfigBag(self), None) if None in [force_settings, force_values]: self._impl_build_all_caches() self._impl_name = session_id def impl_getname(self): return self._impl_name class KernelGroupConfig(_CommonConfig): __slots__ = ('__weakref__', '_impl_children', '_impl_name') impl_type = 'group' def __init__(self, children, session_id=None, _descr=None): assert isinstance(children, list), _("groupconfig's children must be a list") names = [] for child in children: assert isinstance(child, _CommonConfig), _("groupconfig's children must be Config, MetaConfig or GroupConfig") name_ = child._impl_name names.append(name_) if len(names) != len(set(names)): for idx in range(1, len(names) + 1): name = names.pop(0) if name in names: raise ConflictError(_('config name must be uniq in ' 'groupconfig for "{0}"').format(name)) self._impl_children = children self.parents = [] session_id = gen_storage_id(session_id, self) assert valid_name(session_id), _("invalid session ID: {0} for config").format(session_id) super().__init__(_descr, weakref.ref(self), ConfigBag(self), None) self._impl_name = session_id def cfgimpl_get_children(self): return self._impl_children def cfgimpl_reset_cache(self, option_bag, resetted_opts=None): if resetted_opts is None: resetted_opts = [] if isinstance(self, KernelMixConfig): super().cfgimpl_reset_cache(option_bag, resetted_opts=copy(resetted_opts)) for child in self._impl_children: if option_bag is not None: coption_bag = option_bag.copy() cconfig_bag = coption_bag.config_bag.copy() cconfig_bag.context = child coption_bag.config_bag = cconfig_bag else: coption_bag = None child.cfgimpl_reset_cache(coption_bag, resetted_opts=copy(resetted_opts)) def set_value(self, path, index, value, config_bag, only_config=False, _commit=True): """Setattr not in current KernelGroupConfig, but in each children """ ret = [] if self.impl_type == 'group': # No value so cannot commit only one time commit = True else: # Commit only one time commit = False for child in self._impl_children: cconfig_bag = config_bag.copy() cconfig_bag.context = child if isinstance(child, KernelGroupConfig): ret.extend(child.set_value(path, index, value, cconfig_bag, only_config=only_config, _commit=commit)) else: try: subconfig, name = child.cfgimpl_get_home_by_path(path, cconfig_bag) option = subconfig.cfgimpl_get_description().get_child(name, cconfig_bag, child.cfgimpl_get_path()) option_bag = OptionBag() option_bag.set_option(option, path, index, cconfig_bag) child.setattr(value, option_bag, _commit=commit) except PropertiesOptionError as err: ret.append(PropertiesOptionError(err._option_bag, err.proptype, err._settings, err._opt_type, err._name, err._orig_opt)) except (ValueError, LeadershipError, AttributeError) as err: ret.append(err) if _commit and self.impl_type != 'group': self.cfgimpl_get_values()._p_.commit() return ret def find_group(self, config_bag, byname=None, bypath=undefined, byoption=undefined, byvalue=undefined, raise_if_not_found=True, _sub=False): """Find first not in current KernelGroupConfig, but in each children """ # if KernelMetaConfig, all children have same OptionDescription in # context so search only one time the option for all children if bypath is undefined and byname is not None and \ isinstance(self, KernelMixConfig): bypath, byoption = next(self.find(bytype=None, byvalue=undefined, byname=byname, config_bag=config_bag, raise_if_not_found=raise_if_not_found, with_option=True)) byname = None ret = [] for child in self._impl_children: if isinstance(child, KernelGroupConfig): ret.extend(child.find_group(byname=byname, bypath=bypath, byoption=byoption, byvalue=byvalue, config_bag=config_bag, raise_if_not_found=False, _sub=True)) else: try: cconfig_bag = config_bag.copy() cconfig_bag.context = child next(child.find(None, byname, byvalue, config_bag=cconfig_bag, raise_if_not_found=False, only_path=bypath, only_option=byoption)) ret.append(child) except StopIteration: pass if not _sub: self._find_return_results(ret != [], raise_if_not_found) return ret def impl_getname(self): return self._impl_name def reset(self, path, _commit=True): for child in self._impl_children: config_bag = ConfigBag(child) config_bag.remove_validation() subconfig, name = child.cfgimpl_get_home_by_path(path, config_bag) option = subconfig.cfgimpl_get_description().get_child(name, config_bag, subconfig.cfgimpl_get_path()) option_bag = OptionBag() option_bag.set_option(option, path, option, config_bag) option_bag.config_bag.context = child child.cfgimpl_get_values().reset(option_bag, _commit=_commit) def getconfig(self, name): for child in self._impl_children: if name == child.impl_getname(): return child raise ConfigError(_('unknown config "{}"').format(name)) class KernelMixConfig(KernelGroupConfig): __slots__ = ('_display_name', '_impl_symlink') impl_type = 'mix' def __init__(self, optiondescription, children, session_id=None, persistent=False, storage=None, display_name=None, _duplicate=False): # FIXME _duplicate self._display_name = display_name self._impl_symlink = [] for child in children: if not isinstance(child, (KernelConfig, KernelMixConfig)): raise TypeError(_("child must be a Config, MixConfig or MetaConfig")) child.parents.append(weakref.ref(self)) properties, permissives, values, session_id = get_storages(self, session_id, persistent, storage=storage) self._impl_settings = Settings(properties, permissives) self._impl_permissives_cache = Cache() self._impl_properties_cache = Cache() self._impl_values = Values(values) self._impl_values_cache = Cache() super().__init__(children, session_id=session_id, _descr=optiondescription) self._impl_build_all_caches() def set_value(self, path, index, value, config_bag, force_default=False, force_dont_change_value=False, force_default_if_same=False, only_config=False, _commit=True): """only_config: could be set if you want modify value in all Config included in this KernelMetaConfig """ if only_config: if force_default or force_default_if_same or force_dont_change_value: raise ValueError(_('force_default, force_default_if_same or ' 'force_dont_change_value cannot be set with' ' only_config')) return super().set_value(path, index, value, config_bag, only_config=only_config, _commit=_commit) ret = [] subconfig, name = self.cfgimpl_get_home_by_path(path, config_bag) option = subconfig.cfgimpl_get_description().get_child(name, config_bag, self.cfgimpl_get_path()) option_bag = OptionBag() option_bag.set_option(option, path, index, config_bag) if force_default or force_default_if_same or force_dont_change_value: if force_default and force_dont_change_value: raise ValueError(_('force_default and force_dont_change_value' ' cannot be set together')) for child in self._impl_children: cconfig_bag = config_bag.copy() cconfig_bag.context = child try: subconfig2, name = child.cfgimpl_get_home_by_path(path, cconfig_bag) if self.impl_type == 'meta': moption_bag = option_bag del moption_bag.properties del moption_bag.permissives moption_bag.config_bag = cconfig_bag else: option = subconfig2.cfgimpl_get_description().get_child(name, cconfig_bag, child.cfgimpl_get_path()) moption_bag = OptionBag() moption_bag.set_option(option, path, index, cconfig_bag) if force_default_if_same: if not child.cfgimpl_get_values()._p_.hasvalue(path): child_value = undefined else: child_value = subconfig2.getattr(name, moption_bag) if force_default or (force_default_if_same and value == child_value): child.cfgimpl_get_values().reset(moption_bag, _commit=False) continue if force_dont_change_value: child_value = child.getattr(name, moption_bag) if value != child_value: subconfig2.setattr(child_value, moption_bag, _commit=False) except PropertiesOptionError as err: ret.append(PropertiesOptionError(err._option_bag, err.proptype, err._settings, err._opt_type, err._name, err._orig_opt)) except (ValueError, LeadershipError, AttributeError) as err: ret.append(err) try: if self.impl_type == 'meta': del option_bag.properties del option_bag.permissives option_bag.config_bag = config_bag subconfig.setattr(value, option_bag, _commit=False) except (PropertiesOptionError, ValueError, LeadershipError) as err: ret.append(err) return ret def reset(self, path, only_children, config_bag, commit=True): rconfig_bag = config_bag.copy() rconfig_bag.remove_validation() if self.impl_type == 'meta': subconfig, name = self.cfgimpl_get_home_by_path(path, config_bag) option = subconfig.cfgimpl_get_description().get_child(name, config_bag, subconfig.cfgimpl_get_path()) option_bag = OptionBag() option_bag.set_option(option, path, None, rconfig_bag) elif not only_children: try: subconfig, name = self.cfgimpl_get_home_by_path(path, config_bag) option = subconfig.cfgimpl_get_description().get_child(name, config_bag, subconfig.cfgimpl_get_path()) option_bag = OptionBag() option_bag.set_option(option, path, None, rconfig_bag) except AttributeError: only_children = True for child in self._impl_children: rconfig_bag.context = child try: if self.impl_type == 'meta': moption_bag = option_bag moption_bag.config_bag = rconfig_bag else: subconfig, name = child.cfgimpl_get_home_by_path(path, rconfig_bag) option = subconfig.cfgimpl_get_description().get_child(name, rconfig_bag, child.cfgimpl_get_path()) moption_bag = OptionBag() moption_bag.set_option(option, path, None, rconfig_bag) child.cfgimpl_get_values().reset(moption_bag, _commit=False) except AttributeError: pass if isinstance(child, KernelMixConfig): child.reset(path, False, rconfig_bag, commit=False) if not only_children: option_bag.config_bag = config_bag self.cfgimpl_get_values().reset(option_bag, _commit=False) if commit: self.cfgimpl_get_values()._p_.commit() def add_config(self, apiconfig): config = apiconfig._config_bag.context if config.impl_getname() in [child.impl_getname() for child in self._impl_children]: raise ConflictError(_('config name must be uniq in ' 'groupconfig for {0}').format(config.impl_getname())) config.parents.append(weakref.ref(self)) self._impl_children.append(config) config.cfgimpl_reset_cache(None, None) def pop_config(self, session_id, config): if session_id is not None: for idx, child in enumerate(self._impl_children): if session_id == child.impl_getname(): child.cfgimpl_reset_cache(None, None) self._impl_children.pop(idx) break else: raise ConfigError(_('cannot find the config {}').format(session_id)) if config is not None: self._impl_children.remove(config) child = config for index, parent in enumerate(child.parents): if parent() == self: child.parents.pop(index) break else: raise ConfigError(_('cannot find the config {}').format(self.session_id)) return child class KernelMetaConfig(KernelMixConfig): __slots__ = tuple() impl_type = 'meta' def __init__(self, children, session_id=None, persistent=False, optiondescription=None, storage=None, display_name=None, _duplicate=False): descr = None self._display_name = display_name if optiondescription is not None: if not _duplicate: new_children = [] for child_session_id in children: assert isinstance(child_session_id, str), _('MetaConfig with optiondescription' ' must have string has child, ' 'not {}').format(child_session_id) new_children.append(KernelConfig(optiondescription, persistent=persistent, session_id=child_session_id, display_name=self._display_name)) children = new_children descr = optiondescription for child in children: if __debug__ and not isinstance(child, (KernelConfig, KernelMetaConfig)): raise TypeError(_("child must be a Config or MetaConfig")) if descr is None: descr = child.cfgimpl_get_description() elif descr is not child.cfgimpl_get_description(): raise ValueError(_('all config in metaconfig must ' 'have the same optiondescription')) super().__init__(descr, children, persistent=persistent, storage=storage, session_id=session_id) def new_config(self, session_id, type_='config', persistent=False): if session_id in [child.impl_getname() for child in self._impl_children]: raise ConflictError(_('config name must be uniq in ' 'groupconfig for {0}').format(session_id)) assert type_ in ('config', 'metaconfig', 'mixconfig'), _('unknown type {}').format(type_) new = not persistent or session_id not in list_sessions() if type_ == 'config': config = KernelConfig(self._impl_descr, session_id=session_id, persistent=persistent, display_name=self._display_name) elif type_ == 'metaconfig': config = KernelMetaConfig([], optiondescription=self._impl_descr, session_id=session_id, persistent=persistent, display_name=self._display_name) elif type_ == 'mixconfig': config = KernelMixConfig(children=[], optiondescription=self._impl_descr, session_id=session_id, persistent=persistent, display_name=self._display_name) # Copy context properties/permissives if new: config.cfgimpl_get_settings().set_context_properties(self.cfgimpl_get_settings().get_context_properties(config._impl_properties_cache), config) config.cfgimpl_get_settings().set_context_permissives(self.cfgimpl_get_settings().get_context_permissives()) config.cfgimpl_get_settings().ro_append = self.cfgimpl_get_settings().ro_append config.cfgimpl_get_settings().rw_append = self.cfgimpl_get_settings().rw_append config.cfgimpl_get_settings().ro_remove = self.cfgimpl_get_settings().ro_remove config.cfgimpl_get_settings().rw_remove = self.cfgimpl_get_settings().rw_remove config.cfgimpl_get_settings().default_properties = self.cfgimpl_get_settings().default_properties config.parents.append(weakref.ref(self)) self._impl_children.append(config) return config def add_config(self, apiconfig): if self._impl_descr is not apiconfig._config_bag.context.cfgimpl_get_description(): raise ValueError(_('metaconfig must ' 'have the same optiondescription')) super().add_config(apiconfig)