482 lines
20 KiB
Python
482 lines
20 KiB
Python
# Copyright (C) 2012-2020 Team tiramisu (see AUTHORS for all contributors)
|
|
#
|
|
# This program is free software: you can redistribute it and/or modify it
|
|
# under the terms of the GNU Lesser General Public License as published by the
|
|
# Free Software Foundation, either version 3 of the License, or (at your
|
|
# option) any later version.
|
|
#
|
|
# This program is distributed in the hope that it will be useful, but WITHOUT
|
|
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
|
|
# FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
|
|
# details.
|
|
#
|
|
# You should have received a copy of the GNU Lesser General Public License
|
|
# along with this program. If not, see <http://www.gnu.org/licenses/>.
|
|
#
|
|
# The original `Config` design model is unproudly borrowed from
|
|
# the rough gus of pypy: pypy: http://codespeak.net/svn/pypy/dist/pypy/config/
|
|
# the whole pypy projet is under MIT licence
|
|
# ____________________________________________________________
|
|
"enables us to carry out a calculation and return an option's value"
|
|
from typing import Any, Optional, Union, Callable, Dict, List
|
|
from itertools import chain
|
|
|
|
from .error import PropertiesOptionError, ConfigError, LeadershipError, ValueWarning
|
|
from .i18n import _
|
|
from .setting import undefined, ConfigBag, OptionBag, Undefined
|
|
# ____________________________________________________________
|
|
|
|
|
|
class Params:
|
|
__slots__ = ('args', 'kwargs')
|
|
def __init__(self, args=None, kwargs=None, **kwgs):
|
|
if args is None:
|
|
args = tuple()
|
|
if kwargs is None:
|
|
kwargs = {}
|
|
if kwgs:
|
|
kwargs.update(kwgs)
|
|
if isinstance(args, Param):
|
|
args = (args,)
|
|
else:
|
|
if not isinstance(args, tuple):
|
|
raise ValueError(_('args in params must be a tuple'))
|
|
for arg in args:
|
|
if not isinstance(arg, Param):
|
|
raise ValueError(_('arg in params must be a Param'))
|
|
if not isinstance(kwargs, dict):
|
|
raise ValueError(_('kwargs in params must be a dict'))
|
|
for arg in kwargs.values():
|
|
if not isinstance(arg, Param):
|
|
raise ValueError(_('arg in params must be a Param'))
|
|
self.args = args
|
|
self.kwargs = kwargs
|
|
|
|
|
|
class Param:
|
|
pass
|
|
|
|
|
|
class ParamOption(Param):
|
|
__slots__ = ('todict',
|
|
'option',
|
|
'notraisepropertyerror',
|
|
'raisepropertyerror')
|
|
def __init__(self,
|
|
option: 'Option',
|
|
notraisepropertyerror: bool=False,
|
|
raisepropertyerror: bool=False,
|
|
todict: bool=False) -> None:
|
|
if __debug__ and not hasattr(option, 'impl_is_symlinkoption'):
|
|
raise ValueError(_('paramoption needs an option not {}').format(type(option)))
|
|
if option.impl_is_symlinkoption():
|
|
cur_opt = option.impl_getopt()
|
|
else:
|
|
cur_opt = option
|
|
assert isinstance(notraisepropertyerror, bool), _('param must have a boolean not a {} for notraisepropertyerror').format(type(notraisepropertyerror))
|
|
assert isinstance(raisepropertyerror, bool), _('param must have a boolean not a {} for raisepropertyerror').format(type(raisepropertyerror))
|
|
self.todict = todict
|
|
self.option = cur_opt
|
|
self.notraisepropertyerror = notraisepropertyerror
|
|
self.raisepropertyerror = raisepropertyerror
|
|
|
|
|
|
class ParamDynOption(ParamOption):
|
|
__slots__ = ('suffix',
|
|
)
|
|
def __init__(self,
|
|
option: 'Option',
|
|
suffix: str,
|
|
dynoptiondescription: 'DynOptionDescription',
|
|
notraisepropertyerror: bool=False,
|
|
raisepropertyerror: bool=False,
|
|
todict: bool=False,
|
|
) -> None:
|
|
super().__init__(option,
|
|
notraisepropertyerror,
|
|
raisepropertyerror,
|
|
todict,
|
|
)
|
|
self.suffix = suffix
|
|
self.dynoptiondescription = dynoptiondescription
|
|
|
|
|
|
class ParamSelfOption(Param):
|
|
__slots__ = ('todict', 'whole')
|
|
def __init__(self,
|
|
todict: bool=False,
|
|
whole: bool=undefined) -> None:
|
|
"""whole: send all value for a multi, not only indexed value"""
|
|
self.todict = todict
|
|
if whole is not undefined:
|
|
self.whole = whole
|
|
|
|
|
|
class ParamValue(Param):
|
|
__slots__ = ('value',)
|
|
def __init__(self, value):
|
|
self.value = value
|
|
|
|
|
|
class ParamIndex(Param):
|
|
__slots__ = tuple()
|
|
|
|
|
|
class ParamSuffix(Param):
|
|
__slots__ = tuple()
|
|
|
|
|
|
class Calculation:
|
|
__slots__ = ('function',
|
|
'params',
|
|
'help_function',
|
|
'_has_index',
|
|
'warnings_only')
|
|
def __init__(self,
|
|
function: Callable,
|
|
params: Params=Params(),
|
|
help_function: Optional[Callable]=None,
|
|
warnings_only: bool=False):
|
|
assert isinstance(function, Callable), _('first argument ({0}) must be a function').format(function)
|
|
if help_function:
|
|
assert isinstance(help_function, Callable), _('help_function ({0}) must be a function').format(help_function)
|
|
self.help_function = help_function
|
|
else:
|
|
self.help_function = None
|
|
self.function = function
|
|
self.params = params
|
|
for arg in chain(self.params.args, self.params.kwargs.values()):
|
|
if isinstance(arg, ParamIndex):
|
|
self._has_index = True
|
|
break
|
|
if warnings_only is True:
|
|
self.warnings_only = warnings_only
|
|
|
|
async def execute(self,
|
|
option_bag: OptionBag,
|
|
leadership_must_have_index: bool=False,
|
|
orig_value: Any=undefined,
|
|
allow_value_error=False,
|
|
force_value_warning=False,
|
|
) -> Any:
|
|
return await carry_out_calculation(option_bag.option,
|
|
callback=self.function,
|
|
callback_params=self.params,
|
|
index=option_bag.index,
|
|
config_bag=option_bag.config_bag,
|
|
leadership_must_have_index=leadership_must_have_index,
|
|
orig_value=orig_value,
|
|
allow_value_error=allow_value_error,
|
|
force_value_warning=force_value_warning,
|
|
)
|
|
|
|
async def help(self,
|
|
option_bag: OptionBag,
|
|
leadership_must_have_index: bool=False) -> str:
|
|
if not self.help_function:
|
|
return await self.execute(option_bag,
|
|
leadership_must_have_index=leadership_must_have_index)
|
|
return await carry_out_calculation(option_bag.option,
|
|
callback=self.help_function,
|
|
callback_params=self.params,
|
|
index=option_bag.index,
|
|
config_bag=option_bag.config_bag,
|
|
leadership_must_have_index=leadership_must_have_index)
|
|
|
|
def has_index(self, current_option):
|
|
if hasattr(self, '_has_index'):
|
|
return self._has_index
|
|
self._has_index = False
|
|
for arg in chain(self.params.args, self.params.kwargs.values()):
|
|
if isinstance(arg, ParamOption) and arg.option.impl_get_leadership() and \
|
|
arg.option.impl_get_leadership().in_same_group(current_option):
|
|
self._has_index = True
|
|
break
|
|
return self._has_index
|
|
|
|
|
|
class Break(Exception):
|
|
pass
|
|
|
|
|
|
async def manager_callback(callbk: Union[ParamOption, ParamValue],
|
|
option,
|
|
index: Optional[int],
|
|
orig_value,
|
|
config_bag: ConfigBag,
|
|
leadership_must_have_index: bool) -> Any:
|
|
"""replace Param by true value"""
|
|
def calc_index(callbk, index, same_leadership):
|
|
if index is not None:
|
|
if hasattr(callbk, 'whole'):
|
|
whole = callbk.whole
|
|
else:
|
|
# if value is same_leadership, follower are isolate by default
|
|
# otherwise option is a whole option
|
|
whole = not same_leadership
|
|
if not whole:
|
|
return index
|
|
return None
|
|
|
|
async def calc_self(callbk, option, index, value, config_bag):
|
|
# index must be apply only if follower
|
|
is_follower = option.impl_is_follower()
|
|
apply_index = calc_index(callbk, index, is_follower)
|
|
if value is undefined or (apply_index is None and is_follower):
|
|
if config_bag is undefined:
|
|
return undefined
|
|
path = option.impl_getpath()
|
|
option_bag = await get_option_bag(config_bag,
|
|
option,
|
|
apply_index,
|
|
True)
|
|
new_value = await get_value(callbk, option_bag, path)
|
|
if apply_index is None and is_follower:
|
|
new_value[index] = value
|
|
value = new_value
|
|
elif apply_index is not None and not is_follower:
|
|
value = value[apply_index]
|
|
return value
|
|
|
|
async def get_value(callbk,
|
|
option_bag,
|
|
path,
|
|
):
|
|
try:
|
|
# get value
|
|
value = await config_bag.context.getattr(path,
|
|
option_bag)
|
|
except PropertiesOptionError as err:
|
|
# raise PropertiesOptionError (which is catched) because must not add value None in carry_out_calculation
|
|
if callbk.notraisepropertyerror or callbk.raisepropertyerror:
|
|
raise err
|
|
raise ConfigError(_('unable to carry out a calculation for "{}"'
|
|
', {}').format(option.impl_get_display_name(), err), err)
|
|
except ValueError as err:
|
|
raise ValueError(_('the option "{0}" is used in a calculation but is invalid ({1})').format(option_bag.option.impl_get_display_name(), err))
|
|
except AttributeError as err:
|
|
raise ConfigError(_('impossible to calculate "{0}", {1}').format(option_bag.option.impl_get_display_name(),
|
|
err,
|
|
))
|
|
return value
|
|
|
|
async def get_option_bag(config_bag,
|
|
opt,
|
|
index_,
|
|
self_calc):
|
|
# don't validate if option is option that we tried to validate
|
|
config_bag = config_bag.copy()
|
|
config_bag.properties = config_bag.true_properties - {'warnings'}
|
|
config_bag.set_permissive()
|
|
#config_bag.properties -= {'warnings'}
|
|
option_bag = OptionBag()
|
|
option_bag.set_option(opt,
|
|
index_,
|
|
config_bag)
|
|
if not self_calc:
|
|
option_bag.properties = await config_bag.context.cfgimpl_get_settings().getproperties(option_bag)
|
|
else:
|
|
option_bag.config_bag.unrestraint()
|
|
option_bag.config_bag.remove_validation()
|
|
# if we are in properties calculation, cannot calculated properties
|
|
option_bag.properties = await config_bag.context.cfgimpl_get_settings().getproperties(option_bag,
|
|
apply_requires=False)
|
|
return option_bag
|
|
|
|
if isinstance(callbk, ParamValue):
|
|
return callbk.value
|
|
|
|
if isinstance(callbk, ParamIndex):
|
|
return index
|
|
|
|
if isinstance(callbk, ParamSuffix):
|
|
if not option.issubdyn():
|
|
raise ConfigError('option "{}" is not in a dynoptiondescription'.format(option.impl_get_display_name()))
|
|
return option.impl_getsuffix()
|
|
|
|
if isinstance(callbk, ParamSelfOption):
|
|
if leadership_must_have_index and option.impl_get_leadership() and index is None:
|
|
raise Break()
|
|
value = await calc_self(callbk, option, index, orig_value, config_bag)
|
|
if not callbk.todict:
|
|
return value
|
|
return {'name': option.impl_get_display_name(),
|
|
'value': value}
|
|
|
|
if isinstance(callbk, ParamOption):
|
|
callbk_option = callbk.option
|
|
if callbk_option.issubdyn():
|
|
if isinstance(callbk, ParamDynOption):
|
|
subdyn = callbk.dynoptiondescription
|
|
rootpath = subdyn.impl_getpath() + callbk.suffix
|
|
suffix = callbk.suffix
|
|
else:
|
|
if not option.impl_is_dynsymlinkoption():
|
|
msg = 'option "{}" is not dynamic in callback of the option "{}"'
|
|
raise ConfigError(_(msg).format(callbk_option.impl_get_display_name(),
|
|
option.impl_get_display_name(),
|
|
))
|
|
rootpath = option.rootpath
|
|
suffix = option.impl_getsuffix()
|
|
subdyn = callbk_option.getsubdyn()
|
|
callbk_option = callbk_option.to_dynoption(rootpath,
|
|
suffix,
|
|
subdyn)
|
|
if leadership_must_have_index and callbk_option.impl_get_leadership() and index is None:
|
|
raise Break()
|
|
if config_bag is undefined:
|
|
return undefined
|
|
if index is not None and callbk_option.impl_get_leadership() and \
|
|
callbk_option.impl_get_leadership().in_same_group(option):
|
|
if not callbk_option.impl_is_follower():
|
|
# leader
|
|
index_ = None
|
|
with_index = True
|
|
else:
|
|
# follower
|
|
index_ = index
|
|
with_index = False
|
|
else:
|
|
index_ = None
|
|
with_index = False
|
|
path = callbk_option.impl_getpath()
|
|
option_bag = await get_option_bag(config_bag,
|
|
callbk_option,
|
|
index_,
|
|
False)
|
|
value = await get_value(callbk,
|
|
option_bag,
|
|
path,
|
|
)
|
|
if with_index:
|
|
value = value[index]
|
|
if not callbk.todict:
|
|
return value
|
|
return {'name': callbk_option.impl_get_display_name(),
|
|
'value': value}
|
|
raise ConfigError(_('unknown callback type {} in option {}').format(callbk,
|
|
option.impl_get_display_name()))
|
|
|
|
|
|
async def carry_out_calculation(option,
|
|
callback: Callable,
|
|
callback_params: Optional[Params],
|
|
index: Optional[int],
|
|
config_bag: Optional[ConfigBag],
|
|
orig_value=undefined,
|
|
leadership_must_have_index: bool=False,
|
|
allow_value_error: bool=False,
|
|
force_value_warning: bool=False,
|
|
):
|
|
"""a function that carries out a calculation for an option's value
|
|
|
|
:param option: the option
|
|
:param callback: the name of the callback function
|
|
:param callback_params: the callback's parameters
|
|
(only keyword parameters are allowed)
|
|
:param index: if an option is multi, only calculates the nth value
|
|
:param allow_value_error: to know if carry_out_calculation can return ValueError or ValueWarning (for example if it's a validation)
|
|
:param force_value_warning: transform valueError to ValueWarning object
|
|
|
|
The callback_params is a dict. Key is used to build args (if key is '')
|
|
and kwargs (otherwise). Values are tuple of:
|
|
- values
|
|
- tuple with option and boolean's force_permissive (True when don't raise
|
|
if PropertiesOptionError)
|
|
Values could have multiple values only when key is ''."""
|
|
def fake_items(iterator):
|
|
return ((None, i) for i in iterator)
|
|
args = []
|
|
kwargs = {}
|
|
if callback_params:
|
|
for key, callbk in chain(fake_items(callback_params.args), callback_params.kwargs.items()):
|
|
try:
|
|
value = await manager_callback(callbk,
|
|
option,
|
|
index,
|
|
orig_value,
|
|
config_bag,
|
|
leadership_must_have_index)
|
|
if value is undefined:
|
|
return undefined
|
|
if key is None:
|
|
args.append(value)
|
|
else:
|
|
kwargs[key] = value
|
|
except PropertiesOptionError as err:
|
|
if callbk.raisepropertyerror:
|
|
raise err
|
|
if callbk.todict:
|
|
if key is None:
|
|
args.append({'propertyerror': str(err)})
|
|
else:
|
|
kwargs[key] = {'propertyerror': str(err)}
|
|
except Break:
|
|
continue
|
|
ret = calculate(option,
|
|
callback,
|
|
allow_value_error,
|
|
force_value_warning,
|
|
args,
|
|
kwargs)
|
|
if isinstance(ret, list) and not option.impl_is_dynoptiondescription() and \
|
|
option.impl_is_follower() and not option.impl_is_submulti():
|
|
if args or kwargs:
|
|
raise LeadershipError(_('the "{}" function with positional arguments "{}" '
|
|
'and keyword arguments "{}" must not return '
|
|
'a list ("{}") for the follower option "{}"'
|
|
'').format(callback.__name__,
|
|
args,
|
|
kwargs,
|
|
ret,
|
|
option.impl_get_display_name()))
|
|
else:
|
|
raise LeadershipError(_('the "{}" function must not return a list ("{}") '
|
|
'for the follower option "{}"'
|
|
'').format(callback.__name__,
|
|
ret,
|
|
option.impl_get_display_name()))
|
|
return ret
|
|
|
|
|
|
def calculate(option,
|
|
callback: Callable,
|
|
allow_value_error: bool,
|
|
force_value_warning: bool,
|
|
args,
|
|
kwargs,
|
|
):
|
|
"""wrapper that launches the 'callback'
|
|
|
|
:param callback: callback function
|
|
:param args: in the callback's arity, the unnamed parameters
|
|
:param kwargs: in the callback's arity, the named parameters
|
|
|
|
"""
|
|
try:
|
|
return callback(*args, **kwargs)
|
|
except (ValueError, ValueWarning) as err:
|
|
if allow_value_error:
|
|
if force_value_warning:
|
|
raise ValueWarning(str(err))
|
|
raise err
|
|
error = err
|
|
except Exception as err:
|
|
# import traceback
|
|
# traceback.print_exc()
|
|
error = err
|
|
if args or kwargs:
|
|
msg = _('unexpected error "{0}" in function "{1}" with arguments "{3}" and "{4}" '
|
|
'for option "{2}"').format(str(error),
|
|
callback.__name__,
|
|
option.impl_get_display_name(),
|
|
args,
|
|
kwargs)
|
|
else:
|
|
msg = _('unexpected error "{0}" in function "{1}" for option "{2}"'
|
|
'').format(str(error),
|
|
callback.__name__,
|
|
option.impl_get_display_name())
|
|
del error
|
|
raise ConfigError(msg)
|