# Copyright (C) 2012-2021 Team tiramisu (see AUTHORS for all contributors) # # This program is free software: you can redistribute it and/or modify it # under the terms of the GNU Lesser General Public License as published by the # Free Software Foundation, either version 3 of the License, or (at your # option) any later version. # # This program is distributed in the hope that it will be useful, but WITHOUT # ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS # FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more # details. # # You should have received a copy of the GNU Lesser General Public License # along with this program. If not, see . # # The original `Config` design model is unproudly borrowed from # the rough gus of pypy: pypy: http://codespeak.net/svn/pypy/dist/pypy/config/ # the whole pypy projet is under MIT licence # ____________________________________________________________ "enables us to carry out a calculation and return an option's value" from typing import Any, Optional, Union, Callable, Dict, List from types import CoroutineType from itertools import chain from .error import PropertiesOptionError, ConfigError, LeadershipError, ValueWarning from .i18n import _ from .setting import undefined, ConfigBag, OptionBag, Undefined # ____________________________________________________________ class Params: __slots__ = ('args', 'kwargs') def __init__(self, args=None, kwargs=None, **kwgs): if args is None: args = tuple() if kwargs is None: kwargs = {} if kwgs: kwargs.update(kwgs) if isinstance(args, Param): args = (args,) else: if not isinstance(args, tuple): raise ValueError(_('args in params must be a tuple')) for arg in args: if not isinstance(arg, Param): raise ValueError(_('arg in params must be a Param')) if not isinstance(kwargs, dict): raise ValueError(_('kwargs in params must be a dict')) for arg in kwargs.values(): if not isinstance(arg, Param): raise ValueError(_('arg in params must be a Param')) self.args = args self.kwargs = kwargs class Param: pass class ParamOption(Param): __slots__ = ('todict', 'option', 'notraisepropertyerror', 'raisepropertyerror') def __init__(self, option: 'Option', notraisepropertyerror: bool=False, raisepropertyerror: bool=False, todict: bool=False) -> None: if __debug__ and not hasattr(option, 'impl_is_symlinkoption'): raise ValueError(_('paramoption needs an option not {}').format(type(option))) if option.impl_is_symlinkoption(): cur_opt = option.impl_getopt() else: cur_opt = option assert isinstance(notraisepropertyerror, bool), _('param must have a boolean not a {} for notraisepropertyerror').format(type(notraisepropertyerror)) assert isinstance(raisepropertyerror, bool), _('param must have a boolean not a {} for raisepropertyerror').format(type(raisepropertyerror)) self.todict = todict self.option = cur_opt self.notraisepropertyerror = notraisepropertyerror self.raisepropertyerror = raisepropertyerror class ParamDynOption(ParamOption): __slots__ = ('suffix',) def __init__(self, option: 'Option', suffix: str, dynoptiondescription: 'DynOptionDescription', notraisepropertyerror: bool=False, raisepropertyerror: bool=False, todict: bool=False, ) -> None: super().__init__(option, notraisepropertyerror, raisepropertyerror, todict, ) self.suffix = suffix self.dynoptiondescription = dynoptiondescription class ParamSelfOption(Param): __slots__ = ('todict', 'whole') def __init__(self, todict: bool=False, whole: bool=undefined) -> None: """whole: send all value for a multi, not only indexed value""" self.todict = todict if whole is not undefined: self.whole = whole class ParamValue(Param): __slots__ = ('value',) def __init__(self, value): self.value = value class ParamInformation(Param): __slots__ = ('information_name',) def __init__(self, information_name: str, default_value: Any=undefined, ) -> None: self.information_name = information_name self.default_value = default_value class ParamSelfInformation(ParamInformation): __slots__ = tuple() class ParamIndex(Param): __slots__ = tuple() class ParamSuffix(Param): __slots__ = tuple() class Calculation: __slots__ = ('function', 'params', 'help_function', '_has_index', 'warnings_only') def __init__(self, function: Callable, params: Params=Params(), help_function: Optional[Callable]=None, warnings_only: bool=False): assert isinstance(function, Callable), _('first argument ({0}) must be a function').format(function) if help_function: assert isinstance(help_function, Callable), _('help_function ({0}) must be a function').format(help_function) self.help_function = help_function else: self.help_function = None self.function = function self.params = params for arg in chain(self.params.args, self.params.kwargs.values()): if isinstance(arg, ParamIndex): self._has_index = True break if warnings_only is True: self.warnings_only = warnings_only async def execute(self, option_bag: OptionBag, leadership_must_have_index: bool=False, orig_value: Any=undefined, allow_value_error: bool=False, force_value_warning: bool=False, for_settings: bool=False, ) -> Any: return await carry_out_calculation(option_bag.option, callback=self.function, callback_params=self.params, index=option_bag.index, config_bag=option_bag.config_bag, leadership_must_have_index=leadership_must_have_index, orig_value=orig_value, allow_value_error=allow_value_error, force_value_warning=force_value_warning, for_settings=for_settings, ) async def help(self, option_bag: OptionBag, leadership_must_have_index: bool=False, for_settings: bool=False, ) -> str: if not self.help_function: return await self.execute(option_bag, leadership_must_have_index=leadership_must_have_index, for_settings=for_settings, ) return await carry_out_calculation(option_bag.option, callback=self.help_function, callback_params=self.params, index=option_bag.index, config_bag=option_bag.config_bag, leadership_must_have_index=leadership_must_have_index, for_settings=for_settings, ) def has_index(self, current_option): if hasattr(self, '_has_index'): return self._has_index self._has_index = False for arg in chain(self.params.args, self.params.kwargs.values()): if isinstance(arg, ParamOption) and arg.option.impl_get_leadership() and \ arg.option.impl_get_leadership().in_same_group(current_option): self._has_index = True break return self._has_index class Break(Exception): pass async def manager_callback(callbk: Param, option, index: Optional[int], orig_value, config_bag: ConfigBag, leadership_must_have_index: bool, for_settings: bool, ) -> Any: """replace Param by true value""" def calc_index(callbk, index, same_leadership): if index is not None: if hasattr(callbk, 'whole'): whole = callbk.whole else: # if value is same_leadership, follower are isolate by default # otherwise option is a whole option whole = not same_leadership if not whole: return index return None async def calc_self(callbk, option, index, value, config_bag): # index must be apply only if follower is_follower = option.impl_is_follower() apply_index = calc_index(callbk, index, is_follower) if value is undefined or (apply_index is None and is_follower): if config_bag is undefined: return undefined path = option.impl_getpath() option_bag = await get_option_bag(config_bag, option, apply_index, True) new_value = await get_value(callbk, option_bag, path) if apply_index is None and is_follower: new_value[index] = value value = new_value elif apply_index is not None and not is_follower: value = value[apply_index] return value async def get_value(callbk, option_bag, path, ): try: # get value value = await config_bag.context.getattr(path, option_bag) except PropertiesOptionError as err: # raise PropertiesOptionError (which is catched) because must not add value None in carry_out_calculation if callbk.notraisepropertyerror or callbk.raisepropertyerror: raise err raise ConfigError(_('unable to carry out a calculation for "{}"' ', {}').format(option.impl_get_display_name(), err), err) except ValueError as err: raise ValueError(_('the option "{0}" is used in a calculation but is invalid ({1})').format(option_bag.option.impl_get_display_name(), err)) except AttributeError as err: raise ConfigError(_('impossible to calculate "{0}", {1}').format(option_bag.option.impl_get_display_name(), err, )) return value async def get_option_bag(config_bag, opt, index_, self_calc): # don't validate if option is option that we tried to validate config_bag = config_bag.copy() if for_settings: config_bag.properties = config_bag.true_properties - {'warnings'} config_bag.set_permissive() if not for_settings: config_bag.properties -= {'warnings'} option_bag = OptionBag() option_bag.set_option(opt, index_, config_bag) if not self_calc: option_bag.properties = await config_bag.context.cfgimpl_get_settings().getproperties(option_bag) else: option_bag.config_bag.unrestraint() option_bag.config_bag.remove_validation() # if we are in properties calculation, cannot calculated properties option_bag.properties = await config_bag.context.cfgimpl_get_settings().getproperties(option_bag, apply_requires=False) return option_bag if isinstance(callbk, ParamValue): return callbk.value if isinstance(callbk, ParamInformation): if isinstance(callbk, ParamSelfInformation): option_bag = OptionBag() option_bag.set_option(option, index, config_bag, ) else: option_bag = None try: return await config_bag.context.impl_get_information(config_bag, option_bag, callbk.information_name, callbk.default_value, ) except ValueError as err: raise ConfigError(_('option "{}" cannot be calculated: {}').format(option.impl_get_display_name(), str(err), )) if isinstance(callbk, ParamIndex): return index if isinstance(callbk, ParamSuffix): if not option.issubdyn(): raise ConfigError(_('option "{}" is not in a dynoptiondescription').format(option.impl_get_display_name())) return option.impl_getsuffix() if isinstance(callbk, ParamSelfOption): if leadership_must_have_index and option.impl_get_leadership() and index is None: raise Break() value = await calc_self(callbk, option, index, orig_value, config_bag) if not callbk.todict: return value return {'name': option.impl_get_display_name(), 'value': value} if isinstance(callbk, ParamOption): callbk_option = callbk.option if callbk_option.issubdyn(): if isinstance(callbk, ParamDynOption): subdyn = callbk.dynoptiondescription rootpath = subdyn.impl_getpath() + callbk.suffix suffix = callbk.suffix else: if not option.impl_is_dynsymlinkoption(): msg = 'option "{}" is not dynamic but is an argument of the dynamic option "{}" in a callback' raise ConfigError(_(msg).format(option.impl_get_display_name(), callbk_option.impl_get_display_name(), )) #FIXME in same dynamic option? rootpath = option.rootpath suffix = option.impl_getsuffix() subdyn = callbk_option.getsubdyn() callbk_option = callbk_option.to_dynoption(rootpath, suffix, subdyn) if leadership_must_have_index and callbk_option.impl_get_leadership() and index is None: raise Break() if config_bag is undefined: return undefined if index is not None and callbk_option.impl_get_leadership() and \ callbk_option.impl_get_leadership().in_same_group(option): if not callbk_option.impl_is_follower(): # leader index_ = None with_index = True else: # follower index_ = index with_index = False else: index_ = None with_index = False path = callbk_option.impl_getpath() option_bag = await get_option_bag(config_bag, callbk_option, index_, False) value = await get_value(callbk, option_bag, path, ) if with_index: value = value[index] if not callbk.todict: return value return {'name': callbk_option.impl_get_display_name(), 'value': value} raise ConfigError(_('unknown callback type {} in option {}').format(callbk, option.impl_get_display_name())) async def carry_out_calculation(option, callback: Callable, callback_params: Optional[Params], index: Optional[int], config_bag: Optional[ConfigBag], orig_value=undefined, leadership_must_have_index: bool=False, allow_value_error: bool=False, force_value_warning: bool=False, for_settings: bool=False, ): """a function that carries out a calculation for an option's value :param option: the option :param callback: the name of the callback function :param callback_params: the callback's parameters (only keyword parameters are allowed) :param index: if an option is multi, only calculates the nth value :param allow_value_error: to know if carry_out_calculation can return ValueError or ValueWarning (for example if it's a validation) :param force_value_warning: transform valueError to ValueWarning object The callback_params is a dict. Key is used to build args (if key is '') and kwargs (otherwise). Values are tuple of: - values - tuple with option and boolean's force_permissive (True when don't raise if PropertiesOptionError) Values could have multiple values only when key is ''.""" def fake_items(iterator): return ((None, i) for i in iterator) args = [] kwargs = {} if callback_params: for key, callbk in chain(fake_items(callback_params.args), callback_params.kwargs.items()): try: value = await manager_callback(callbk, option, index, orig_value, config_bag, leadership_must_have_index, for_settings, ) if value is undefined: return undefined if key is None: args.append(value) else: kwargs[key] = value except PropertiesOptionError as err: if callbk.raisepropertyerror: raise err if callbk.todict: if key is None: args.append({'propertyerror': str(err)}) else: kwargs[key] = {'propertyerror': str(err)} except Break: continue ret = await calculate(option, callback, allow_value_error, force_value_warning, args, kwargs) if isinstance(ret, list) and not option.impl_is_dynoptiondescription() and \ option.impl_is_follower() and not option.impl_is_submulti(): if args or kwargs: raise LeadershipError(_('the "{}" function with positional arguments "{}" ' 'and keyword arguments "{}" must not return ' 'a list ("{}") for the follower option "{}"' '').format(callback.__name__, args, kwargs, ret, option.impl_get_display_name())) else: raise LeadershipError(_('the "{}" function must not return a list ("{}") ' 'for the follower option "{}"' '').format(callback.__name__, ret, option.impl_get_display_name())) return ret async def calculate(option, callback: Callable, allow_value_error: bool, force_value_warning: bool, args, kwargs, ): """wrapper that launches the 'callback' :param callback: callback function :param args: in the callback's arity, the unnamed parameters :param kwargs: in the callback's arity, the named parameters """ try: ret = callback(*args, **kwargs) if isinstance(ret, CoroutineType): ret = await ret return ret except (ValueError, ValueWarning) as err: if allow_value_error: if force_value_warning: raise ValueWarning(str(err)) raise err error = err except Exception as err: # import traceback # traceback.print_exc() error = err if args or kwargs: msg = _('unexpected error "{0}" in function "{1}" with arguments "{3}" and "{4}" ' 'for option "{2}"').format(str(error), callback.__name__, option.impl_get_display_name(), args, kwargs) else: msg = _('unexpected error "{0}" in function "{1}" for option "{2}"' '').format(str(error), callback.__name__, option.impl_get_display_name()) raise ConfigError(msg) from error