# -*- coding: utf-8 -*- # Copyright (C) 2017-2018 Team tiramisu (see AUTHORS for all contributors) # # This program is free software: you can redistribute it and/or modify it # under the terms of the GNU Lesser General Public License as published by the # Free Software Foundation, either version 3 of the License, or (at your # option) any later version. # # This program is distributed in the hope that it will be useful, but WITHOUT # ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS # FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more # details. # # You should have received a copy of the GNU Lesser General Public License # along with this program. If not, see . # ____________________________________________________________ from inspect import ismethod, getdoc, signature from time import time from typing import List, Any, Optional, Callable, Union, Dict from .error import APIError, ConfigError, SlaveError, PropertiesOptionError from .i18n import _ from .setting import ConfigBag, OptionBag, owners, groups, Undefined, undefined, FORBIDDEN_SET_PROPERTIES from .config import KernelConfig, SubConfig, KernelGroupConfig, KernelMetaConfig from .option import ChoiceOption, OptionDescription TIRAMISU_VERSION = 3 EXCLUDE_HELP = ('help', '_get_option', '_test_slave_index') class TiramisuHelp: icon = '\u2937' tmpl_help = '{0}{1} {2}: \n{0} {3}\n' def help(self, init: bool=True, space: str="", root: str='', _display: bool=True, _valid: bool=False) -> List[str]: options = [] if init and isinstance(self, TiramisuAPI): options.append(self.tmpl_help.format(space, self.icon, root + 'unrestraint', _('access to option without property restriction'))) options.append(self.tmpl_help.format(space, self.icon, root + 'forcepermissive', _('access to option without verifying permissive property'))) root = '[unrestraint.|forcepermissive.]' if 'registers' in dir(self): modules = list(self.registers.keys()) modules.sort() for module_name in modules: module = self.registers[module_name] try: instance_module = module(None) except TypeError: instance_module = module(None, None, None) if isinstance(instance_module, TiramisuDispatcher): if _valid and not getdoc(module.__call__): # pragma: no cover raise Exception('unknown doc for {}'.format('__call__')) module_doc = _(getdoc(module.__call__)) module_signature = signature(module.__call__) module_args = [str(module_signature.parameters[key]) for key in list(module_signature.parameters.keys())[1:]] module_args = '(' + ', '.join(module_args) + ')' options.append(self.tmpl_help.format(space, self.icon, root + module_name + module_args, module_doc)) if hasattr(module, 'subhelp'): instance_submodule = module.subhelp(None, None, None, None, None) options.extend(instance_submodule.help(init=False, space=space + ' ', root=root + module_name + module_args + '.')) else: root = root + '[config(path).]' if isinstance(instance_module, CommonTiramisuOption): if _valid and not getdoc(module): # pragma: no cover raise Exception('unknown doc for {}'.format(module.__class__.__name__)) module_doc = _(getdoc(module)) options.append(self.tmpl_help.format(space, self.icon, root + module_name, module_doc)) if isinstance(instance_module, TiramisuContext): if _valid and not getdoc(module): # pragma: no cover raise Exception('unknown doc for {}'.format(module.__class__.__name__)) module_doc = _(getdoc(module)) options.append(self.tmpl_help.format(space, self.icon, root + module_name, module_doc)) options.extend(instance_module.help(init=False, space=space + ' ', root=root + '{}.'.format(module_name))) funcs = dir(self) funcs.sort() for func_name in funcs: if not func_name.startswith('__') and not func_name in EXCLUDE_HELP: func = getattr(self, func_name) if ismethod(func): module_signature = signature(func) module_args = list(module_signature.parameters.keys()) module_args = [str(module_signature.parameters[key]) for key in module_signature.parameters.keys()] module_args = '(' + ', '.join(module_args) + ')' if func_name.startswith('_'): func_name = func_name[1:] if _valid and not getdoc(func): # pragma: no cover raise Exception('unknown doc for {}'.format(func.__name__)) options.append(self.tmpl_help.format(space, self.icon, root + func_name + module_args, _(getdoc(func)))) if init: if _display: # pragma: no cover print('\n'.join(options)) else: return options class CommonTiramisu(TiramisuHelp): allow_optiondescription = True registers = {} def _get_option(self) -> Any: option = self.option_bag.option if option is None: option = self.subconfig.cfgimpl_get_description().impl_getchild(self._name, self.option_bag.config_bag, self.subconfig.cfgimpl_get_path()) self.option_bag.set_option(option, self.option_bag.path, self.option_bag.index, self.option_bag.config_bag) self.option_bag.config_bag.context.cfgimpl_get_settings().validate_properties(self.option_bag) index = self.option_bag.index if index is not None: if option.impl_is_optiondescription() or not option.impl_is_master_slaves('slave'): raise APIError('index must be set only with a slave option') self._length = self.subconfig.cfgimpl_get_length_slave(self.option_bag) if index >= self._length: raise SlaveError(_('index "{}" is higher than the master length "{}" ' 'for option "{}"').format(index, self._length, option.impl_get_display_name())) if not self.allow_optiondescription and option.impl_is_optiondescription(): raise APIError(_('option must not be an optiondescription')) return option class CommonTiramisuOption(CommonTiramisu): allow_optiondescription = False slave_need_index = True def __init__(self, name: str, subconfig: Union[KernelConfig, SubConfig], option_bag: OptionBag) -> None: self.option_bag = option_bag self._name = name self.subconfig = subconfig # for help() if option_bag is not None and self.option_bag.config_bag.context.impl_type != 'group': self._get_option() if option_bag.config_bag is not None and self.slave_need_index: self._test_slave_index() def _test_slave_index(self) -> None: option = self.option_bag.option if not option.impl_is_optiondescription(): if self.option_bag.index is None and option.impl_is_master_slaves('slave'): raise APIError(_('index must be set with the slave option "{}"').format(self.option_bag.path)) elif self.option_bag.index is not None and not option.impl_is_master_slaves('slave'): raise APIError(_('index must be set only with a slave option, not for "{}"').format(self.option_bag.path)) # pragma: no cover def __getattr__(self, name): # pragma: no cover if not hasattr(CommonTiramisuOption, name): raise APIError(_('unknown method {}').format(name)) else: super().__getattribute__(name) class TiramisuOptionOption(CommonTiramisuOption): """manage option""" allow_optiondescription = True slave_need_index = False def get(self): """get Tiramisu option""" return self.option_bag.option def _ismulti(self): """test if option could have multi value""" option = self.option_bag.option return option.impl_is_multi() def _issubmulti(self): """test if option could have submulti value""" option = self.option_bag.option return option.impl_is_submulti() def ismasterslaves(self): """test if option is a master or a slave""" option = self.option_bag.option return option.impl_is_master_slaves() def _ismaster(self): """test if option is a master""" option = self.option_bag.option return option.impl_is_master_slaves('master') def _isslave(self): """test if option is a slave""" option = self.option_bag.option return option.impl_is_master_slaves('slave') def doc(self): """get option document""" option = self.option_bag.option return option.impl_get_display_name() def name(self): """get option name""" return self._name def path(self) -> str: """get option path""" return self.option_bag.path def _default(self): """get default value for an option (not for optiondescription)""" option = self.option_bag.option return option.impl_getdefault() def _defaultmulti(self): """get default value when added a value for a multi option (not for optiondescription)""" option = self.option_bag.option return option.impl_getdefault_multi() def has_dependency(self, self_is_dep=True): """test if option has dependency""" option = self.option_bag.option return option.impl_has_dependency(self_is_dep) def _consistencies(self): """get consistencies for an option (not for optiondescription)""" option = self.option_bag.option return option.get_consistencies() def _callbacks(self): """get callbacks for an option (not for optiondescription)""" option = self.option_bag.option return option.impl_get_callback() def requires(self): """get requires for an option""" option = self.option_bag.option return option.impl_getrequires() def __getattr__(self, name: str) -> Callable: option = self.option_bag.option if not option.impl_is_optiondescription() and not name.startswith('_'): return getattr(self, '_' + name) raise APIError(_('{} is unknown').format(name)) # pragma: no cover def isoptiondescription(self): """test if option is an optiondescription""" option = self.option_bag.option return option.impl_is_optiondescription() class TiramisuOptionOwner(CommonTiramisuOption): """manager option's owner""" def __init__(self, name: str, subconfig: Union[KernelConfig, SubConfig], option_bag: OptionBag) -> None: super().__init__(name, subconfig, option_bag) if option_bag is not None: # for help() self.values = self.option_bag.config_bag.context.cfgimpl_get_values() def get(self): """get owner for a specified option""" option = self.option_bag.option return self.values.getowner(self.option_bag) def isdefault(self): """is option has defaut value""" option = self.option_bag.option return self.values.is_default_owner(self.option_bag) def set(self, owner): """get owner for a specified option""" option = self.option_bag.option try: obj_owner = getattr(owners, owner) except AttributeError: owners.addowner(owner) obj_owner = getattr(owners, owner) self.values.setowner(obj_owner, self.option_bag) class TiramisuOptionProperty(CommonTiramisuOption): """manager option's property""" allow_optiondescription = True slave_need_index = False def __init__(self, name: str, subconfig: Union[KernelConfig, SubConfig], option_bag: OptionBag) -> None: super().__init__(name, subconfig, option_bag) if option_bag and option_bag.config_bag: self.settings = option_bag.config_bag.context.cfgimpl_get_settings() def get(self, apply_requires=True): """get properties for an option""" option = self.option_bag.option if apply_requires: self._test_slave_index() properties = self.option_bag.properties else: properties = self.settings.getproperties(self.option_bag, apply_requires=False) return set(properties) def add(self, prop): """add new property for an option""" option = self.option_bag.option if prop in FORBIDDEN_SET_PROPERTIES: raise ConfigError(_('cannot add this property: "{0}"').format( ' '.join(prop))) props = self.settings.getproperties(self.option_bag, apply_requires=False) self.settings.setproperties(self.option_bag.path, props | {prop}, self.option_bag, self.option_bag.config_bag.context) def pop(self, prop): """remove new property for an option""" option = self.option_bag.option props = self.settings.getproperties(self.option_bag, apply_requires=False) self.settings.setproperties(self.option_bag.path, props - {prop}, self.option_bag, self.option_bag.config_bag.context) def reset(self): """reset all personalised properties""" option = self.option_bag.option self.settings.reset(self.option_bag, self.option_bag.config_bag.context) class TiramisuOptionPermissive(CommonTiramisuOption): """manager option's property""" allow_optiondescription = True slave_need_index = False # FIXME should have same api than property def __init__(self, name: str, subconfig: Union[KernelConfig, SubConfig], option_bag: OptionBag) -> None: super().__init__(name, subconfig, option_bag) if option_bag and option_bag.config_bag: self.settings = option_bag.config_bag.context.cfgimpl_get_settings() def get(self): """get permissives value""" return self.settings.getpermissives(self.option_bag.option, self.option_bag.path) def set(self, permissives): """set permissives value""" option = self.option_bag.option self.settings.setpermissives(self.option_bag, permissives=permissives) def reset(self): """reset all personalised permissive""" self.set(frozenset()) class TiramisuOptionInformation(CommonTiramisuOption): """manage option informations""" allow_optiondescription = True slave_need_index = False def get(self, key, default=undefined): """get information for a key name""" path = self.option_bag.path values = self.option_bag.config_bag.context.cfgimpl_get_values() try: return values.get_information(key, default, path=path) except ValueError: option = self.option_bag.option return option.impl_get_information(key, default) def set(self, key, value): """set information for a key name""" path = self.option_bag.path values = self.option_bag.config_bag.context.cfgimpl_get_values() values.set_information(key, value, path=path) def reset(self, key): """remove information for a key name""" path = self.option_bag.path values = self.option_bag.config_bag.context.cfgimpl_get_values() values.del_information(key, path=path) class TiramisuOptionValue(CommonTiramisuOption): """manager option's value""" allow_optiondescription = True slave_need_index = False def _o_get(self): """get option's value""" option = self.option_bag.option self._test_slave_index() return self.subconfig.getattr(self._name, self.option_bag) def _o_set(self, value): """set a value for a specified option""" option = self.option_bag.option self._test_slave_index() values = self.option_bag.config_bag.context.cfgimpl_get_values() if isinstance(value, list): while undefined in value: idx = value.index(undefined) value[idx] = values.getdefaultvalue(self.option_bag, force_index=idx) elif value == undefined: value = values.getdefaultvalue(self.option_bag) self.subconfig.setattr(value, self.option_bag) def _m_pop(self, index): """pop value for a master option (only for master option)""" assert not self.option_bag.option.impl_is_symlinkoption(), _("can't delete a SymLinkOption") self.option_bag.config_bag.context.cfgimpl_get_values().reset_master(index, self.option_bag, self.subconfig) def _o_reset(self): """reset value for a value""" self._test_slave_index() self.subconfig.delattr(self.option_bag) def _m_reset(self, itself: bool=True, children: bool=False): """reset value for a MetaConfig""" if children: config_bag = self.option_bag.config_bag config_bag.context.reset(self.option_bag.path, config_bag) if itself: self._o_reset() def _g_reset(self): """reset value for a GroupConfig""" self.option_bag.config_bag.context.reset(self.option_bag.path) def _m_len_master(self): """length of master option (only for slave option)""" option = self.option_bag.option # for example if index is None if '_length' not in vars(self): self._length = self.subconfig.cfgimpl_get_length() return self._length def _s_len_slave(self): """length of slave option (only for slave option)""" option = self.option_bag.option # for example if index is None if '_length' not in vars(self): self._length = self.subconfig.cfgimpl_get_length_slave(self.option_bag) return self._length def _c_list(self): """all values available for an option (only for choiceoption)""" option = self.option_bag.option return option.impl_get_values(self.option_bag) def _od_dict(self, flatten=False, withvalue=undefined, withoption=None, fullpath=False): """return dict with path as key and value (only for optiondescription)""" self._get_option() name = self.option_bag.option.impl_getname() subconfig = self.subconfig.get_subconfig(name, self.option_bag) config_bag = self.option_bag.config_bag if config_bag.properties and 'warnings' in config_bag.properties: config_bag = config_bag.copy() config_bag.properties = config_bag.properties - {'warnings'} return subconfig.make_dict(config_bag=config_bag, flatten=flatten, fullpath=fullpath, withoption=withoption, withvalue=withvalue) def __getattr__(self, name: str) -> Callable: option = self.option_bag.option if name.startswith('_'): # not a valid function pass elif name == 'list' and isinstance(option, ChoiceOption): return self._c_list elif name == 'pop' and option.impl_is_master_slaves('master'): return self._m_pop elif name == 'len': if option.impl_is_master_slaves('slave'): return self._s_len_slave if option.impl_is_master_slaves('master'): return self._m_len_master elif name == 'dict' and option.impl_is_optiondescription(): return self._od_dict elif name == 'reset': if self.option_bag.config_bag.context.impl_type == 'group': return getattr(self, '_g_' + name) elif not option.impl_is_optiondescription(): if self.option_bag.config_bag.context.impl_type == 'meta': return getattr(self, '_m_' + name) return getattr(self, '_o_' + name) elif option and not option.impl_is_optiondescription(): return getattr(self, '_o_' + name) raise APIError(_('{} is unknown').format(name)) def registers(registers: Dict[str, type], prefix: str) -> None: for module_name in globals().keys(): if module_name != prefix and module_name.startswith(prefix): module = globals()[module_name] func_name = module_name[len(prefix):].lower() registers[func_name] = module class TiramisuOption(CommonTiramisu): registers = {} def __init__(self, name: Optional[str], path: Optional[str]=None, index: Optional[int]=None, subconfig: Union[None, KernelConfig, SubConfig]=None, config_bag: Optional[ConfigBag]=None, option_bag: Optional[OptionBag]=None) -> None: self._name = name self.subconfig = subconfig self._path = path self.index = index self.config_bag = config_bag self.option_bag = OptionBag() self.option_bag.path = self._path self.option_bag.index = self.index self.option_bag.config_bag = self.config_bag if not self.registers: registers(self.registers, self.__class__.__name__) def __getattr__(self, subfunc: str) -> Any: if subfunc in self.registers: return self.registers[subfunc](self._name, self.subconfig, self.option_bag) elif self._get_option().impl_is_optiondescription() and not subfunc.startswith('_'): return getattr(self, '_' + subfunc) raise APIError(_('please specify a valid sub function ({})').format(subfunc)) # pragma: no cover def _find(self, name: str, value=undefined, type=None, first: bool=False): """find an option by name (only for optiondescription)""" if not first: ret = [] option = self._get_option() oname = option.impl_getname() path = self.subconfig._get_subpath(oname) option_bag = OptionBag() option_bag.set_option(option, path, None, self.config_bag) subconfig = self.subconfig.get_subconfig(oname, option_bag) for path in subconfig.find(byname=name, byvalue=value, bytype=type, _subpath=self._path, config_bag=self.config_bag): subconfig, name = self.config_bag.context.cfgimpl_get_home_by_path(path, self.config_bag) t_option = TiramisuOption(name, path, None, # index for a slave ? subconfig, self.config_bag) if first: return t_option ret.append(t_option) return ret # # def _get(self, name): # self._get_option() # current_option = self.option_bag.option.impl_getchild(name, # self.config_bag, # self.subconfig.cfgimpl_get_path) # path = self.option_bag.path + '.' + name # option_bag= OptionBag() # option_bag.set_option(current_option, # path, # None, # self.config_bag) # if current_option.impl_is_optiondescription(): # subconfig = self.subconfig.getattr(name, # option_bag) # else: # subconfig = self.subconfig # return TiramisuOption(name, # path, # None, # subconfig, # self.config_bag, # option_bag) def _group_type(self): """get type for an optiondescription (only for optiondescription)""" return self._get_option().impl_get_group_type() def _list(self, type='option', group_type=None): """list options in an optiondescription (only for optiondescription)""" if type not in ('all', 'option', 'optiondescription'): raise APIError(_('unknown list type {}').format(type)) # pragma: no cover if group_type is not None and not isinstance(group_type, groups.GroupType): raise TypeError(_("unknown group_type: {0}").format(group_type)) # pragma: no cover def _filter(opt): if self.config_bag.properties: name = opt.impl_getname() path = subconfig._get_subpath(name) option_bag = OptionBag() option_bag.set_option(opt, path, None, self.config_bag) if opt.impl_is_optiondescription(): self.subconfig.get_subconfig(name, option_bag) else: subconfig.getattr(name, option_bag) option = self._get_option() name = option.impl_getname() path = self.subconfig._get_subpath(name) option_bag = OptionBag() option_bag.set_option(option, path, None, self.config_bag) subconfig = self.subconfig.get_subconfig(name, option_bag) for opt in option.impl_getchildren(self.config_bag): try: subsubconfig = _filter(opt) except PropertiesOptionError: continue if opt.impl_is_optiondescription(): if type == 'option' or (type == 'optiondescription' and group_type and \ opt.impl_get_group_type() != group_type): continue elif type == 'optiondescription': continue name = opt.impl_getname() yield TiramisuOption(name, subconfig._get_subpath(name), None, subconfig, self.config_bag) class TiramisuContext(TiramisuHelp): def __init__(self, config_bag: Optional[ConfigBag]) -> None: self.config_bag = config_bag class TiramisuContextInformation(TiramisuContext): """manage configuration informations""" def get(self, name, default=undefined): """get information for a key name""" return self.config_bag.context.impl_get_information(name, default) def set(self, name, value): """set information for a key name""" self.config_bag.context.impl_set_information(name, value) def reset(self, name): """remove information for a key name""" self.config_bag.context.impl_del_information(name) class TiramisuContextValue(TiramisuContext): """manager value""" def mandatory_warnings(self): """return path of options with mandatory property without any value""" return self.config_bag.context.cfgimpl_get_values().mandatory_warnings(self.config_bag) def set(self, path: str, value, index=None, only_config=undefined, force_default=undefined, force_default_if_same=undefined, force_dont_change_value=undefined): """set values for a GroupConfig or a MetaConfig""" kwargs = {} if only_config is not undefined: kwargs['only_config'] = only_config if force_default is not undefined: kwargs['force_default'] = force_default if force_default_if_same is not undefined: kwargs['force_default_if_same'] = force_default_if_same if force_dont_change_value is not undefined: kwargs['force_dont_change_value'] = force_dont_change_value return self.config_bag.context.set_value(path, index, value, self.config_bag, **kwargs) def dict(self, flatten=False, withvalue=undefined, withoption=None, fullpath=False): """return dict with path as key and value""" if not self.config_bag.properties: config_bag = self.config_bag else: config_bag = self.config_bag.copy() config_bag.properties = self.config_bag.properties - {'warnings'} return config_bag.context.make_dict(config_bag, flatten=flatten, fullpath=fullpath, withoption=withoption, withvalue=withvalue) def exportation(self, with_default_owner: bool=False): """export all values""" exportation = self.config_bag.context.cfgimpl_get_values()._p_.exportation() if not with_default_owner: exportation = [list(exportation[0]), list(exportation[1]), list(exportation[2]), list(exportation[3])] index = exportation[0].index(None) exportation[0].pop(index) exportation[1].pop(index) exportation[2].pop(index) exportation[3].pop(index) return exportation def importation(self, values): """import values""" if None not in values[0]: context_owner = self.config_bag.context.cfgimpl_get_values().get_context_owner() else: context_owner = None self.config_bag.context.cfgimpl_get_values()._p_.importation(values) self.config_bag.context.cfgimpl_reset_cache(None, None) if context_owner is not None: self.config_bag.context.cfgimpl_get_values()._p_.setvalue(None, None, context_owner, None, True) class TiramisuContextOwner(TiramisuContext): """manager value""" def get(self): """get default owner""" return self.config_bag.context.cfgimpl_get_values().get_context_owner() def set(self, owner): """set default owner""" try: obj_owner = getattr(owners, owner) except AttributeError: owners.addowner(owner) obj_owner = getattr(owners, owner) self.config_bag.context.cfgimpl_get_values().set_context_owner(obj_owner) class TiramisuContextProperty(TiramisuContext): """manage configuration properties""" def read_only(self): """set configuration to read only mode""" settings = self.config_bag.context.cfgimpl_get_settings() settings.read_only(self.config_bag.context) del self.config_bag.properties def read_write(self): """set configuration to read and write mode""" settings = self.config_bag.context.cfgimpl_get_settings() settings.read_write(self.config_bag.context) # #FIXME ? permissives = frozenset(settings.get_context_permissives() | frozenset(['hidden'])) settings.set_context_permissives(permissives) #/FIXME ? del self.config_bag.properties def add(self, prop): """add a configuration property""" props = self.get() props.add(prop) self.set(frozenset(props)) del self.config_bag.properties def pop(self, prop): """remove a configuration property""" props = self.get() if prop in props: props.remove(prop) self.set(frozenset(props)) del self.config_bag.properties def get(self): """get all configuration properties""" return set(self.config_bag.properties) def set(self, props): """personalise configuration properties""" context = self.config_bag.context context.cfgimpl_get_settings().set_context_properties(props, context) del self.config_bag.properties def reset(self, all=False): """remove configuration properties""" context = self.config_bag.context context.cfgimpl_get_settings().reset(None, context, all_properties=all) del self.config_bag.properties def exportation(self): """export configuration properties""" return self.config_bag.context.cfgimpl_get_settings()._p_.exportation() def importation(self, properties): """import configuration properties""" self.config_bag.context.cfgimpl_get_settings()._p_.importation(properties) self.config_bag.context.cfgimpl_reset_cache(None, None) del self.config_bag.properties class TiramisuContextPermissive(TiramisuContext): """manage configuration permissives""" def get(self): """get configuration permissives""" return self.config_bag.context.cfgimpl_get_settings().get_context_permissives() def set(self, permissives): """set configuration permissives""" self.config_bag.context.cfgimpl_get_settings().set_context_permissives(permissives) del self.config_bag.permissives def exportation(self): """export configuration permissives""" return self.config_bag.context.cfgimpl_get_settings()._pp_.exportation() def importation(self, permissives): """import configuration permissives""" self.config_bag.context.cfgimpl_get_settings()._pp_.importation(permissives) self.config_bag.context.cfgimpl_reset_cache(None, None) del self.config_bag.permissives class TiramisuContextOption(TiramisuContext): """manage option""" def find(self, name, value=undefined, type=None, first=False): """find an option by name""" if not first: ret = [] for path in self.config_bag.context.find(byname=name, byvalue=value, bytype=type, #_subpath=self._path, config_bag=self.config_bag): subconfig, name = self.config_bag.context.cfgimpl_get_home_by_path(path, self.config_bag) t_option = TiramisuOption(name, path, None, # index for a slave ? subconfig, self.config_bag) if first: return t_option ret.append(t_option) return ret def list(self, type='option', group_type=None, recursive=False): """list content of an optiondescription""" def _filter(opt): if self.config_bag.properties: name = opt.impl_getname() option_bag = OptionBag() option_bag.set_option(opt, name, None, self.config_bag) if opt.impl_is_optiondescription(): self.config_bag.context.cfgimpl_get_settings().validate_properties(option_bag) else: self.config_bag.context.getattr(name, option_bag) def _walk(option): for opt in option.impl_getchildren(self.config_bag): try: subsubconfig = _filter(opt) except PropertiesOptionError: continue if opt.impl_is_optiondescription(): if recursive: for toption in _walk(opt): yield toption if type == 'option' or (type == 'optiondescription' and \ group_type and opt.impl_get_group_type() != group_type): continue elif type == 'optiondescription': continue path = opt.impl_getpath() subconfig, name = self.config_bag.context.cfgimpl_get_home_by_path(path, self.config_bag) yield TiramisuOption(name, path, None, subconfig, self.config_bag) if type not in ('all', 'option', 'optiondescription'): raise APIError(_('unknown list type {}').format(type)) if group_type is not None and not isinstance(group_type, groups.GroupType): raise TypeError(_("unknown group_type: {0}").format(group_type)) option = self.config_bag.context.cfgimpl_get_description() for toption in _walk(option): yield toption class TiramisuContextConfig(TiramisuContext): """configuration methods""" def find(self, name: str, value=undefined, first: bool=False): """find a path from option name and optionnaly a value to MetaConfig or GroupConfig""" if first: return Config(self.config_bag.context.find_firsts(byname=name, byvalue=value, config_bag=self.config_bag)) else: # pragma: no cover raise APIError('not implemented yet') def name(self): return self.config_bag.context.impl_getname() def _c_copy(self, session_id=None, persistent=False, storage=None): return Config(self.config_bag.context.duplicate(session_id, persistent=persistent, storage=storage)) def _c_deepcopy(self, session_id=None, persistent=False, storage=None, metaconfig_prefix=None): return Config(self.config_bag.context.duplicate(session_id, persistent=persistent, storage=storage, metaconfig_prefix=metaconfig_prefix, deep=True)) def _c_metaconfig(self): return Config(self.config_bag.context.cfgimpl_get_meta()) def _m_new(self, session_id, persistent=False, type='config'): return Config(self.config_bag.context.new_config(session_id=session_id, persistent=persistent, type_=type)) def _m_pop(self, session_id): return Config(self.config_bag.context.pop_config(session_id=session_id)) def _m_list(self): return self._g_list() def _g_list(self): for child in self.config_bag.context.cfgimpl_get_children(): yield Config(child) def _m_reset(self): self._c_reset() def _c_reset(self): # Option's values context_owner = self.config_bag.context.cfgimpl_get_values().get_context_owner() self.config_bag.context.cfgimpl_get_values()._p_.importation((tuple(), tuple(), tuple(), tuple())) self.config_bag.context.cfgimpl_get_values()._p_.setvalue(None, None, context_owner, None, True) # Option's informations self.config_bag.context.cfgimpl_get_values()._p_.del_informations() # Option's properties self.config_bag.context.cfgimpl_get_settings()._p_.importation({}) # Option's permissives self.config_bag.context.cfgimpl_get_settings()._pp_.importation({}) # Remove cache self.config_bag.context.cfgimpl_reset_cache(None, None) def __getattr__(self, name: str) -> Callable: if not name.startswith('_'): try: if self.config_bag.context.impl_type == 'meta': return getattr(self, '_m_' + name) elif self.config_bag.context.impl_type == 'group': return getattr(self, '_g_' + name) elif self.config_bag.context.impl_type == 'config': return getattr(self, '_c_' + name) except APIError: # pragma: no cover raise APIError(_('{} is unknown').format(name)) raise APIError(_('{} is unknown').format(name)) # pragma: no cover class TiramisuDispatcher: pass class TiramisuAPI(TiramisuHelp): registers = {} def __init__(self, config) -> None: if not isinstance(config, ConfigBag): config = ConfigBag(context=config) self._config_bag = config if not self.registers: registers(self.registers, 'TiramisuContext') registers(self.registers, 'TiramisuDispatcher') def __getattr__(self, subfunc: str) -> Any: if subfunc == 'forcepermissive': config_bag = self._config_bag.copy() config_bag.set_permissive() return TiramisuAPI(config_bag) elif subfunc == 'unrestraint': config_bag = self._config_bag.copy() config_bag.properties = frozenset() return TiramisuAPI(config_bag) elif subfunc in self.registers: config_bag = self._config_bag del config_bag.permissives return self.registers[subfunc](config_bag) else: raise APIError(_('please specify a valid sub function ({})').format(subfunc)) class TiramisuDispatcherConfig(TiramisuDispatcher, TiramisuContextConfig): def __call__(self, path: Optional[str]): """select a child Tiramisu configuration (only with MetaConfig or GroupConfig)""" if path is None: return Config(self.config_bag) spaths = path.split('.') config = self.config_bag.context for spath in spaths: config = config.getconfig(spath) return Config(config) class TiramisuDispatcherOption(TiramisuDispatcher, TiramisuContextOption): subhelp = TiramisuOption def __call__(self, path: str, index: Optional[int]=None) -> TiramisuOption: """select a option (index only for slave option)""" if self.config_bag.context.impl_type == 'group': subpath, name = path.rsplit('.', 1) subconfig = None else: subconfig, name = self.config_bag.context.cfgimpl_get_home_by_path(path, self.config_bag) return TiramisuOption(name, path, index, subconfig, self.config_bag) class Config(TiramisuAPI): def __init__(self, descr: OptionDescription, session_id: str=None, persistent: bool=False, storage=None) -> None: if isinstance(descr, OptionDescription): config = KernelConfig(descr, session_id=session_id, persistent=persistent, storage=storage) else: config = descr super().__init__(config) class MetaConfig(TiramisuAPI): def __init__(self, children, session_id: Union[str, None]=None, persistent: bool=False, optiondescription: Union[OptionDescription, None]=None) -> None: _children = [] for child in children: if isinstance(child, TiramisuAPI): _children.append(child._config_bag.context) else: _children.append(child) config = KernelMetaConfig(_children, session_id=session_id, persistent=persistent, optiondescription=optiondescription) super().__init__(config) class GroupConfig(TiramisuAPI): def __init__(self, children, session_id: Union[str, None]=None) -> None: _children = [] for child in children: _children.append(child._config_bag.context) config = KernelGroupConfig(_children, session_id=session_id) super().__init__(config) def getapi(config: Config): # pragma: no cover """instanciate Config :param config: KernelConfig object :type descr: an instance of ``config.KernelConfig`` """ return config