add Calculation to properties

This commit is contained in:
2019-09-01 09:41:53 +02:00
parent 7c641961d3
commit bb2ecc94d9
27 changed files with 2358 additions and 574 deletions

View File

@ -14,8 +14,8 @@
# along with this program. If not, see <http://www.gnu.org/licenses/>.
"""Configuration management library written in python
"""
from .function import Params, ParamOption, ParamValue, ParamContext, \
tiramisu_copy, calc_value
from .function import tiramisu_copy, calc_value, calc_value_property_help
from .autolib import Calculation, Params, ParamOption, ParamSelfOption, ParamValue, ParamIndex, ParamContext
from .option import *
from .error import APIError
from .api import Config, MetaConfig, GroupConfig, MixConfig
@ -25,9 +25,12 @@ from .storage import default_storage, Storage, list_sessions, \
delete_session
allfuncs = ['Params',
'ParamOption',
allfuncs = ['Calculation',
'Params',
'ParamOption',
'ParamSelfOption',
'ParamValue',
'ParamIndex',
'ParamContext',
'MetaConfig',
'MixConfig',
@ -40,7 +43,8 @@ allfuncs = ['Params',
'list_sessions',
'delete_session',
'tiramisu_copy',
'calc_value']
'calc_value',
'calc_value_property_help']
allfuncs.extend(all_options)
del(all_options)
__all__ = tuple(allfuncs)

View File

@ -384,8 +384,6 @@ class TiramisuOptionProperty(CommonTiramisuOption):
def get(self,
only_raises=False):
"""Get properties for an option"""
option = self._option_bag.option
#self._test_follower_index()
if not only_raises:
return self._option_bag.properties
# do not check cache properties/permissives which are not save (unrestraint, ...)
@ -397,8 +395,8 @@ class TiramisuOptionProperty(CommonTiramisuOption):
if prop in FORBIDDEN_SET_PROPERTIES:
raise ConfigError(_('cannot add this property: "{0}"').format(
' '.join(prop)))
props = self._settings.getproperties(self._option_bag,
apply_requires=False)
props = self._settings._p_.getproperties(self._option_bag.path,
option.impl_getproperties())
self._settings.setproperties(self._option_bag.path,
props | {prop},
self._option_bag,
@ -407,8 +405,8 @@ class TiramisuOptionProperty(CommonTiramisuOption):
def pop(self, prop):
"""Remove new property for an option"""
option = self._option_bag.option
props = self._settings.getproperties(self._option_bag,
apply_requires=False)
props = self._settings._p_.getproperties(self._option_bag.path,
option.impl_getproperties())
self._settings.setproperties(self._option_bag.path,
props - {prop},
self._option_bag,

View File

@ -19,22 +19,159 @@
# ____________________________________________________________
"enables us to carry out a calculation and return an option's value"
from typing import Any, Optional, Union, Callable, Dict, List
from itertools import chain
from .error import PropertiesOptionError, ConfigError, LeadershipError
from .i18n import _
from .setting import undefined, ConfigBag, OptionBag, Undefined
from .storage import get_default_values_storages, get_default_settings_storages
from .function import ParamValue, ParamContext, ParamIndex, ParamOption, Params
# ____________________________________________________________
class Params:
__slots__ = ('args', 'kwargs')
def __init__(self, args=None, kwargs=None, **kwgs):
if args is None:
args = tuple()
if kwargs is None:
kwargs = {}
if kwgs:
kwargs.update(kwgs)
if isinstance(args, Param):
args = (args,)
else:
if not isinstance(args, tuple):
raise ValueError(_('args in params must be a tuple'))
for arg in args:
if not isinstance(arg, Param):
raise ValueError(_('arg in params must be a Param'))
if not isinstance(kwargs, dict):
raise ValueError(_('kwargs in params must be a dict'))
for arg in kwargs.values():
if not isinstance(arg, Param):
raise ValueError(_('arg in params must be a Param'))
self.args = args
self.kwargs = kwargs
class Param:
pass
class ParamOption(Param):
__slots__ = ('todict',
'error',
'option',
'notraisepropertyerror',
'raisepropertyerror')
def __init__(self,
option: 'Option',
notraisepropertyerror: bool=False,
raisepropertyerror: bool=False,
todict: bool=False) -> None:
if __debug__ and not hasattr(option, 'impl_is_symlinkoption'):
raise ValueError(_('paramoption needs an option not {}').format(type(option)))
if option.impl_is_symlinkoption():
cur_opt = option.impl_getopt()
else:
cur_opt = option
assert isinstance(notraisepropertyerror, bool), _('param must have a boolean not a {} for notraisepropertyerror').format(type(notraisepropertyerror))
assert isinstance(raisepropertyerror, bool), _('param must have a boolean not a {} for raisepropertyerror').format(type(raisepropertyerror))
self.todict = todict
self.option = cur_opt
self.notraisepropertyerror = notraisepropertyerror
self.raisepropertyerror = raisepropertyerror
class ParamSelfOption(Param):
__slots__ = ('todict',)
def __init__(self,
todict: bool=False) -> None:
self.todict = todict
class ParamValue(Param):
__slots__ = ('value',)
def __init__(self, value):
self.value = value
class ParamContext(Param):
__slots__ = tuple()
class ParamIndex(Param):
__slots__ = tuple()
class Calculation:
__slots__ = ('function',
'params',
'help_function',
'has_index')
def __init__(self,
function: Callable,
params: Optional[Params]=None,
help_function: Optional[Callable]=None):
assert isinstance(function, Callable), _('first argument ({0}) must be a function').format(function)
if help_function:
assert isinstance(help_function, Callable), _('help_function ({0}) must be a function').format(help_function)
self.help_function = help_function
else:
self.help_function = None
self.function = function
if params:
self.params = params
for arg in chain(self.params.args, self.params.kwargs.values()):
if isinstance(arg, ParamIndex):
self.has_index = True
break
else:
self.has_index = False
else:
self.has_index = False
def execute(self,
option_bag: OptionBag,
leadership_must_have_index: bool=False) -> Any:
if leadership_must_have_index and not self.has_index:
leadership_must_have_index = False
return carry_out_calculation(option_bag.option,
callback=self.function,
callback_params=self.params,
index=option_bag.index,
config_bag=option_bag.config_bag,
fromconsistency=option_bag.fromconsistency,
leadership_must_have_index=leadership_must_have_index)
def help(self,
option_bag: OptionBag,
leadership_must_have_index: bool=False) -> str:
if not self.help_function:
return self.execute(option_bag,
leadership_must_have_index=leadership_must_have_index)
if leadership_must_have_index and not self.has_index:
leadership_must_have_index = False
return carry_out_calculation(option_bag.option,
callback=self.help_function,
callback_params=self.params,
index=option_bag.index,
config_bag=option_bag.config_bag,
fromconsistency=option_bag.fromconsistency,
leadership_must_have_index=leadership_must_have_index)
class Break(Exception):
pass
def manager_callback(callbk: Union[ParamOption, ParamValue],
option,
index: Optional[int],
orig_value,
config_bag: ConfigBag,
fromconsistency: List) -> Any:
fromconsistency: List,
leadership_must_have_index: bool) -> Any:
"""replace Param by true value"""
if isinstance(callbk, ParamValue):
return callbk.value
@ -46,17 +183,24 @@ def manager_callback(callbk: Union[ParamOption, ParamValue],
# Not an option, set full context
return config_bag.context.duplicate(force_values=get_default_values_storages(),
force_settings=get_default_settings_storages())
opt = callbk.option
if isinstance(callbk, ParamSelfOption):
opt = option
else:
# it's ParamOption
opt = callbk.option
if opt.issubdyn():
opt = opt.to_dynoption(option.rootpath,
option.impl_getsuffix())
path = opt.impl_getpath()
is_follower = opt.impl_is_follower()
if leadership_must_have_index and opt.impl_get_leadership() and index is None:
raise Break()
if index is not None and opt.impl_get_leadership() and \
opt.impl_get_leadership().in_same_group(option):
if opt == option:
index_ = None
with_index = False
elif opt.impl_is_follower():
elif is_follower:
index_ = index
with_index = False
else:
@ -66,35 +210,43 @@ def manager_callback(callbk: Union[ParamOption, ParamValue],
index_ = None
with_index = False
if opt == option and orig_value is not undefined and \
(not opt.impl_is_follower() or index is None):
return orig_value
# don't validate if option is option that we tried to validate
config_bag = config_bag.copy()
config_bag.set_permissive()
config_bag.properties -= {'warnings'}
option_bag = OptionBag()
option_bag.set_option(opt,
path,
index_,
config_bag)
if fromconsistency:
option_bag.fromconsistency = fromconsistency.copy()
if opt == option:
option_bag.config_bag.unrestraint()
option_bag.config_bag.remove_validation()
try:
# get value
value = config_bag.context.getattr(path,
option_bag)
if with_index:
return value[index]
(not is_follower or index is None):
value = orig_value
else:
# don't validate if option is option that we tried to validate
config_bag = config_bag.copy()
config_bag.properties = config_bag.true_properties - {'warnings'}
config_bag.set_permissive()
#config_bag.properties -= {'warnings'}
option_bag = OptionBag()
option_bag.set_option(opt,
path,
index_,
config_bag)
if fromconsistency:
option_bag.fromconsistency = fromconsistency.copy()
if opt == option:
option_bag.config_bag.unrestraint()
option_bag.config_bag.remove_validation()
# if we are in properties calculation, cannot calculated properties
option_bag.properties = config_bag.context.cfgimpl_get_settings().getproperties(option_bag,
apply_requires=False)
try:
# get value
value = config_bag.context.getattr(path,
option_bag)
if with_index:
value = value[index]
except PropertiesOptionError as err:
# raise PropertiesOptionError (which is catched) because must not add value None in carry_out_calculation
if callbk.notraisepropertyerror or callbk.raisepropertyerror:
raise err
raise ConfigError(_('unable to carry out a calculation for "{}"'
', {}').format(option.impl_get_display_name(), err), err)
if not callbk.todict:
return value
except PropertiesOptionError as err:
# raise PropertiesOptionError (which is catched) because must not add value None in carry_out_calculation
if callbk.notraisepropertyerror:
raise err
raise ConfigError(_('unable to carry out a calculation for "{}"'
', {}').format(option.impl_get_display_name(), err), err)
return {'name': opt.impl_get_display_name(),
'value': value}
def carry_out_calculation(option,
@ -104,8 +256,8 @@ def carry_out_calculation(option,
config_bag: Optional[ConfigBag],
fromconsistency: List,
orig_value=undefined,
leadership_must_have_index: bool=False,
is_validator: int=False):
"""a function that carries out a calculation for an option's value
:param option: the option
@ -227,12 +379,18 @@ def carry_out_calculation(option,
index,
orig_value,
config_bag,
fromconsistency)
fromconsistency,
leadership_must_have_index)
if value is undefined:
return undefined
args.append(value)
except PropertiesOptionError:
pass
except PropertiesOptionError as err:
if callbk.raisepropertyerror:
raise err
if callbk.todict:
args.append({'propertyerror': str(err)})
except Break:
continue
for key, callbk in callback_params.kwargs.items():
try:
value = manager_callback(callbk,
@ -240,12 +398,18 @@ def carry_out_calculation(option,
index,
orig_value,
config_bag,
fromconsistency)
fromconsistency,
leadership_must_have_index)
if value is undefined:
return undefined
kwargs[key] = value
except PropertiesOptionError:
pass
except PropertiesOptionError as err:
if callbk.raisepropertyerror:
raise err
if callbk.todict:
kwargs[key] = {'propertyerror': str(err)}
except Break:
continue
ret = calculate(option,
callback,
is_validator,
@ -290,6 +454,8 @@ def calculate(option,
raise err
error = err
except Exception as err:
#import traceback
#traceback.print_exc()
error = err
if args or kwargs:
msg = _('unexpected error "{0}" in function "{1}" with arguments "{3}" and "{4}" '

View File

@ -269,10 +269,9 @@ class SubConfig(object):
def getattr(self,
name,
option_bag,
from_follower=False):
from_follower=False,
needs_re_verify_follower_properties=False):
"""
attribute notation mechanism for accessing the value of an option
:param name: attribute name
:return: option's value if name is an option name, OptionDescription
otherwise
"""
@ -298,7 +297,7 @@ class SubConfig(object):
return context.getattr(soption_bag.path,
soption_bag)
if not from_follower or option_bag.option.impl_getrequires():
if not from_follower or needs_re_verify_follower_properties:
self.cfgimpl_get_settings().validate_properties(option_bag)
if option.impl_is_follower() and not from_follower:
@ -311,6 +310,8 @@ class SubConfig(object):
length,
option_bag.index))
if option.impl_is_follower() and option_bag.index is None:
needs_re_verify_follower_properties = option_bag.option.impl_getrequires() or \
self.cfgimpl_get_settings().has_properties_index(option_bag)
value = []
for idx in range(length):
soption_bag = OptionBag()
@ -322,7 +323,8 @@ class SubConfig(object):
try:
value.append(self.getattr(name,
soption_bag,
from_follower=True))
from_follower=True,
needs_re_verify_follower_properties=needs_re_verify_follower_properties))
except PropertiesOptionError as err:
value.append(err)
else:
@ -412,40 +414,7 @@ class SubConfig(object):
withoption=None,
withvalue=undefined,
fullpath=False):
"""exports the whole config into a `dict`, for example:
>>> print(cfg.make_dict())
{'od2.var4': None, 'od2.var5': None, 'od2.var6': None}
:param flatten: returns a dict(name=value) instead of a dict(path=value)
::
>>> print(cfg.make_dict(flatten=True))
{'var5': None, 'var4': None, 'var6': None}
:param withoption: returns the options that are present in the very same
`OptionDescription` than the `withoption` itself::
>>> print(cfg.make_dict(withoption='var1'))
{'od2.var4': None, 'od2.var5': None,
'od2.var6': None,
'od2.var1': u'value',
'od1.var1': None,
'od1.var3': None,
'od1.var2': None}
:param withvalue: returns the options that have the value `withvalue`
::
>>> print(c.make_dict(withoption='var1',
withvalue=u'value'))
{'od2.var4': None,
'od2.var5': None,
'od2.var6': None,
'od2.var1': u'value'}
"""exports the whole config into a `dict`
:returns: dict of Option's name (or path) and values
"""
pathsvalues = {}

View File

@ -30,7 +30,7 @@ def display_list(lst, separator='and', add_quote=False):
ret = lst[0]
if not isinstance(ret, str):
ret = str(ret)
if add_quote:
if add_quote and not ret.startswith('"'):
ret = '"{}"'.format(ret)
return ret
else:
@ -39,13 +39,13 @@ def display_list(lst, separator='and', add_quote=False):
for l in lst[:-1]:
if not isinstance(l, str):
l = str(l)
if add_quote:
if add_quote and not l.startswith('"'):
l = '"{}"'.format(l)
lst_.append(_(l))
last = lst[-1]
if not isinstance(last, str):
last = str(_(last))
if add_quote:
if add_quote and not last.startswith('"'):
last = '"{}"'.format(last)
return ', '.join(lst_) + _(' {} ').format(separator) + '{}'.format(last)
@ -91,7 +91,6 @@ class PropertiesOptionError(AttributeError):
return 'error'
req = self._settings.apply_requires(self._option_bag,
True)
#if req != {} or self._orig_opt is not None:
if req != {}:
only_one = len(req) == 1
msg = []
@ -99,8 +98,16 @@ class PropertiesOptionError(AttributeError):
msg.append('"{0}" ({1})'.format(action, display_list(msg_, add_quote=False)))
msg = display_list(msg, add_quote=False)
else:
only_one = len(self.proptype) == 1
msg = display_list(list(self.proptype), add_quote=True)
properties = list(self._settings.calc_raises_properties(self._option_bag,
apply_requires=False))
for property_ in self._settings.get_calculated_properties(self._option_bag):
properties.append(property_.help(self._option_bag))
if not properties:
# if proptype == ['mandatory']
properties = self.proptype
only_one = len(properties) == 1
msg = display_list(properties, add_quote=True)
if only_one:
prop_msg = _('property')
else:

View File

@ -14,276 +14,305 @@
# along with this program. If not, see <http://www.gnu.org/licenses/>.
from typing import Any, List, Optional
from operator import add, mul, sub, truediv
from .setting import undefined
from .i18n import _
class Params:
__slots__ = ('args', 'kwargs')
def __init__(self, args=None, kwargs=None, **kwgs):
if args is None:
args = tuple()
if kwargs is None:
kwargs = {}
if kwgs:
kwargs.update(kwgs)
if isinstance(args, Param):
args = (args,)
else:
if not isinstance(args, tuple):
raise ValueError(_('args in params must be a tuple'))
for arg in args:
if not isinstance(arg, Param):
raise ValueError(_('arg in params must be a Param'))
if not isinstance(kwargs, dict):
raise ValueError(_('kwargs in params must be a dict'))
for arg in kwargs.values():
if not isinstance(arg, Param):
raise ValueError(_('arg in params must be a Param'))
self.args = args
self.kwargs = kwargs
class Param:
pass
class ParamOption(Param):
__slots__ = ('option',
'notraisepropertyerror')
def __init__(self,
option: 'Option',
notraisepropertyerror: bool=False) -> None:
if __debug__ and not hasattr(option, 'impl_is_symlinkoption'):
raise ValueError(_('paramoption needs an option not {}').format(type(option)))
if option.impl_is_symlinkoption():
cur_opt = option.impl_getopt()
else:
cur_opt = option
if not isinstance(notraisepropertyerror, bool):
raise ValueError(_('param must have a boolean'
' not a {} for notraisepropertyerror'
).format(type(notraisepropertyerror)))
self.option = cur_opt
self.notraisepropertyerror = notraisepropertyerror
class ParamValue(Param):
__slots__ = ('value',)
def __init__(self, value):
self.value = value
class ParamContext(Param):
__slots__ = tuple()
class ParamIndex(Param):
__slots__ = tuple()
from .setting import undefined
from .error import display_list
def tiramisu_copy(val): # pragma: no cover
return val
def calc_value(*args: List[Any],
multi: bool=False,
default: Any=undefined,
condition: Any=undefined,
expected: Any=undefined,
condition_operator: str='AND',
allow_none: bool=False,
remove_duplicate_value: bool=False,
join: Optional[str]=None,
min_args_len: Optional[int]=None,
operator: Optional[str]=None,
index: Optional[int]=None,
**kwargs) -> Any:
"""calculate value
:param multi: value returns must be a list of value
:param default: default value if condition is not matched or if args is empty
if there is more than one default value, set default_0, default_1, ...
:param condition: test if condition is equal to expected value
if there is more than one condition, set condition_0, condition_1, ...
:param expected: value expected for all conditions
if expected value is different between condition, set expected_0, expected_1, ...
:param condition_operator: OR or AND operator for condition
:param allow_none: if False, do not return list in None is present in list
:param remove_duplicate_value: if True, remote duplicated value
:param join: join all args with specified characters
:param min_args_len: if number of arguments is smaller than this value, return default value
:param operator: operator
class CalcValue:
def __call__(self,
*args: List[Any],
multi: bool=False,
default: Any=undefined,
condition: Any=undefined,
no_condition_is_invalid: Any=False,
expected: Any=undefined,
condition_operator: str='AND',
inverse_condition: bool=False,
allow_none: bool=False,
remove_duplicate_value: bool=False,
join: Optional[str]=None,
min_args_len: Optional[int]=None,
operator: Optional[str]=None,
index: Optional[int]=None,
**kwargs) -> Any:
"""calculate value
:param args: list of value
:param multi: value returns must be a list of value
:param default: default value if condition is not matched or if args is empty
if there is more than one default value, set default_0, default_1, ...
:param condition: test if condition is equal to expected value
if there is more than one condition, set condition_0, condition_1, ...
:param expected: value expected for all conditions
if expected value is different between condition, set expected_0, expected_1, ...
:param no_condition_is_invalid: if no condition and not condition_0, condition_1, ... (for
example if option is disabled) consider that condition not matching
:param condition_operator: OR or AND operator for condition
:param allow_none: if False, do not return list in None is present in list
:param remove_duplicate_value: if True, remote duplicated value
:param join: join all args with specified characters
:param min_args_len: if number of arguments is smaller than this value, return default value
:param operator: 'add', 'mul', 'div' or 'sub' all args (args must be integer value)
:param index: index for follower
examples:
* you want to copy value from an option to an other option:
>>> from tiramisu import calc_value, StrOption, OptionDescription, Config, Params, ParamOption
>>> val1 = StrOption('val1', '', 'val1')
>>> val2 = StrOption('val2', '', callback=calc_value, callback_params=Params(ParamOption(val1)))
>>> od = OptionDescription('root', '', [val1, val2])
>>> cfg = Config(od)
>>> cfg.value.dict()
{'val1': 'val1', 'val2': 'val1'}
examples:
* you want to copy value from an option to an other option:
>>> from tiramisu import calc_value, StrOption, OptionDescription, Config, Params, ParamOption
>>> val1 = StrOption('val1', '', 'val1')
>>> val2 = StrOption('val2', '', callback=calc_value, callback_params=Params(ParamOption(val1)))
>>> od = OptionDescription('root', '', [val1, val2])
>>> cfg = Config(od)
>>> cfg.value.dict()
{'val1': 'val1', 'val2': 'val1'}
* you want to copy values from two options in one multi option
>>> from tiramisu import calc_value, StrOption, OptionDescription, Config, Params, ParamOption, ParamValue
>>> val1 = StrOption('val1', "", 'val1')
>>> val2 = StrOption('val2', "", 'val2')
>>> val3 = StrOption('val3', "", multi=True, callback=calc_value, callback_params=Params((ParamOption(val1), ParamOption(val2)), multi=ParamValue(True)))
>>> od = OptionDescription('root', '', [val1, val2, val3])
>>> cfg = Config(od)
>>> cfg.value.dict()
{'val1': 'val1', 'val2': 'val2', 'val3': ['val1', 'val2']}
* you want to copy values from two options in one multi option
>>> from tiramisu import calc_value, StrOption, OptionDescription, Config, Params, ParamOption, ParamValue
>>> val1 = StrOption('val1', "", 'val1')
>>> val2 = StrOption('val2', "", 'val2')
>>> val3 = StrOption('val3', "", multi=True, callback=calc_value, callback_params=Params((ParamOption(val1), ParamOption(val2)), multi=ParamValue(True)))
>>> od = OptionDescription('root', '', [val1, val2, val3])
>>> cfg = Config(od)
>>> cfg.value.dict()
{'val1': 'val1', 'val2': 'val2', 'val3': ['val1', 'val2']}
* you want to copy a value from an option is it not disabled, otherwise set 'default_value'
>>> from tiramisu import calc_value, StrOption, OptionDescription, Config, Params, ParamOption, ParamValue
>>> val1 = StrOption('val1', '', 'val1')
>>> val2 = StrOption('val2', '', callback=calc_value, callback_params=Params(ParamOption(val1, True), default=ParamValue('default_value')))
>>> od = OptionDescription('root', '', [val1, val2])
>>> cfg = Config(od)
>>> cfg.property.read_write()
>>> cfg.value.dict()
{'val1': 'val1', 'val2': 'val1'}
>>> cfg.option('val1').property.add('disabled')
>>> cfg.value.dict()
{'val2': 'default_value'}
* you want to copy a value from an option if it not disabled, otherwise set 'default_value'
>>> from tiramisu import calc_value, StrOption, OptionDescription, Config, Params, ParamOption, ParamValue
>>> val1 = StrOption('val1', '', 'val1')
>>> val2 = StrOption('val2', '', callback=calc_value, callback_params=Params(ParamOption(val1, True), default=ParamValue('default_value')))
>>> od = OptionDescription('root', '', [val1, val2])
>>> cfg = Config(od)
>>> cfg.property.read_write()
>>> cfg.value.dict()
{'val1': 'val1', 'val2': 'val1'}
>>> cfg.option('val1').property.add('disabled')
>>> cfg.value.dict()
{'val2': 'default_value'}
* you want to copy value from an option is an other is True, otherwise set 'default_value'
>>> from tiramisu import calc_value, BoolOption, StrOption, OptionDescription, Config, Params, ParamOption, ParamValue
>>> boolean = BoolOption('boolean', '', True)
>>> val1 = StrOption('val1', '', 'val1')
>>> val2 = StrOption('val2', '', callback=calc_value, callback_params=Params(ParamOption(val1, True),
... default=ParamValue('default_value'),
... condition=ParamOption(boolean),
... expected=ParamValue(True)))
>>> od = OptionDescription('root', '', [boolean, val1, val2])
>>> cfg = Config(od)
>>> cfg.property.read_write()
>>> cfg.value.dict()
{'boolean': True, 'val1': 'val1', 'val2': 'val1'}
>>> cfg.option('boolean').value.set(False)
>>> cfg.value.dict()
{'boolean': False, 'val1': 'val1', 'val2': 'default_value'}
* you want to copy value from an option if an other is True, otherwise set 'default_value'
>>> from tiramisu import calc_value, BoolOption, StrOption, OptionDescription, Config, Params, ParamOption, ParamValue
>>> boolean = BoolOption('boolean', '', True)
>>> val1 = StrOption('val1', '', 'val1')
>>> val2 = StrOption('val2', '', callback=calc_value, callback_params=Params(ParamOption(val1, True),
... default=ParamValue('default_value'),
... condition=ParamOption(boolean),
... expected=ParamValue(True)))
>>> od = OptionDescription('root', '', [boolean, val1, val2])
>>> cfg = Config(od)
>>> cfg.property.read_write()
>>> cfg.value.dict()
{'boolean': True, 'val1': 'val1', 'val2': 'val1'}
>>> cfg.option('boolean').value.set(False)
>>> cfg.value.dict()
{'boolean': False, 'val1': 'val1', 'val2': 'default_value'}
* you want to copy option even if None is present
>>> from tiramisu import calc_value, StrOption, OptionDescription, Config, Params, ParamOption, ParamValue
>>> val1 = StrOption('val1', "", 'val1')
>>> val2 = StrOption('val2', "")
>>> val3 = StrOption('val3', "", multi=True, callback=calc_value, callback_params=Params((ParamOption(val1), ParamOption(val2)), multi=ParamValue(True), allow_none=ParamValue(True)))
>>> od = OptionDescription('root', '', [val1, val2, val3])
>>> cfg = Config(od)
>>> cfg.value.dict()
{'val1': 'val1', 'val2': None, 'val3': ['val1', None]}
* you want to copy option even if None is present
>>> from tiramisu import calc_value, StrOption, OptionDescription, Config, Params, ParamOption, ParamValue
>>> val1 = StrOption('val1', "", 'val1')
>>> val2 = StrOption('val2', "")
>>> val3 = StrOption('val3', "", multi=True, callback=calc_value, callback_params=Params((ParamOption(val1), ParamOption(val2)), multi=ParamValue(True), allow_none=ParamValue(True)))
>>> od = OptionDescription('root', '', [val1, val2, val3])
>>> cfg = Config(od)
>>> cfg.value.dict()
{'val1': 'val1', 'val2': None, 'val3': ['val1', None]}
* you want uniq value
>>> from tiramisu import calc_value, StrOption, OptionDescription, Config, Params, ParamOption, ParamValue
>>> val1 = StrOption('val1', "", 'val1')
>>> val2 = StrOption('val2', "", 'val1')
>>> val3 = StrOption('val3', "", multi=True, callback=calc_value, callback_params=Params((ParamOption(val1), ParamOption(val2)), multi=ParamValue(True), remove_duplicate_value=ParamValue(True)))
>>> od = OptionDescription('root', '', [val1, val2, val3])
>>> cfg = Config(od)
>>> cfg.value.dict()
{'val1': 'val1', 'val2': 'val1', 'val3': ['val1']}
* you want uniq value
>>> from tiramisu import calc_value, StrOption, OptionDescription, Config, Params, ParamOption, ParamValue
>>> val1 = StrOption('val1', "", 'val1')
>>> val2 = StrOption('val2', "", 'val1')
>>> val3 = StrOption('val3', "", multi=True, callback=calc_value, callback_params=Params((ParamOption(val1), ParamOption(val2)), multi=ParamValue(True), remove_duplicate_value=ParamValue(True)))
>>> od = OptionDescription('root', '', [val1, val2, val3])
>>> cfg = Config(od)
>>> cfg.value.dict()
{'val1': 'val1', 'val2': 'val1', 'val3': ['val1']}
* you want to join two values with '.'
>>> from tiramisu import calc_value, StrOption, OptionDescription, Config, Params, ParamOption, ParamValue
>>> val1 = StrOption('val1', "", 'val1')
>>> val2 = StrOption('val2', "", 'val2')
>>> val3 = StrOption('val3', "", callback=calc_value, callback_params=Params((ParamOption(val1), ParamOption(val2)), join=ParamValue('.')))
>>> od = OptionDescription('root', '', [val1, val2, val3])
>>> cfg = Config(od)
>>> cfg.value.dict()
{'val1': 'val1', 'val2': 'val2', 'val3': 'val1.val2'}
* you want to join two values with '.'
>>> from tiramisu import calc_value, StrOption, OptionDescription, Config, Params, ParamOption, ParamValue
>>> val1 = StrOption('val1', "", 'val1')
>>> val2 = StrOption('val2', "", 'val2')
>>> val3 = StrOption('val3', "", callback=calc_value, callback_params=Params((ParamOption(val1), ParamOption(val2)), join=ParamValue('.')))
>>> od = OptionDescription('root', '', [val1, val2, val3])
>>> cfg = Config(od)
>>> cfg.value.dict()
{'val1': 'val1', 'val2': 'val2', 'val3': 'val1.val2'}
* you want join three values, only if almost three values are set
>>> from tiramisu import calc_value, StrOption, OptionDescription, Config, Params, ParamOption, ParamValue
>>> val1 = StrOption('val1', "", 'val1')
>>> val2 = StrOption('val2', "", 'val2')
>>> val3 = StrOption('val3', "", 'val3')
>>> val4 = StrOption('val4', "", callback=calc_value, callback_params=Params((ParamOption(val1), ParamOption(val2), ParamOption(val3, True)), join=ParamValue('.'), min_args_len=ParamValue(3)))
>>> od = OptionDescription('root', '', [val1, val2, val3, val4])
>>> cfg = Config(od)
>>> cfg.property.read_write()
>>> cfg.value.dict()
{'val1': 'val1', 'val2': 'val2', 'val3': 'val3', 'val4': 'val1.val2.val3'}
>>> cfg.option('val3').property.add('disabled')
>>> cfg.value.dict()
{'val1': 'val1', 'val2': 'val2', 'val4': ''}
* you want join three values, only if almost three values are set
>>> from tiramisu import calc_value, StrOption, OptionDescription, Config, Params, ParamOption, ParamValue
>>> val1 = StrOption('val1', "", 'val1')
>>> val2 = StrOption('val2', "", 'val2')
>>> val3 = StrOption('val3', "", 'val3')
>>> val4 = StrOption('val4', "", callback=calc_value, callback_params=Params((ParamOption(val1), ParamOption(val2), ParamOption(val3, True)), join=ParamValue('.'), min_args_len=ParamValue(3)))
>>> od = OptionDescription('root', '', [val1, val2, val3, val4])
>>> cfg = Config(od)
>>> cfg.property.read_write()
>>> cfg.value.dict()
{'val1': 'val1', 'val2': 'val2', 'val3': 'val3', 'val4': 'val1.val2.val3'}
>>> cfg.option('val3').property.add('disabled')
>>> cfg.value.dict()
{'val1': 'val1', 'val2': 'val2', 'val4': ''}
* you want to add all values
>>> from tiramisu import calc_value, IntOption, OptionDescription, Config, Params, ParamOption, ParamValue
>>> val1 = IntOption('val1', "", 1)
>>> val2 = IntOption('val2', "", 2)
>>> val3 = IntOption('val3', "", callback=calc_value, callback_params=Params((ParamOption(val1), ParamOption(val2)), operator=ParamValue('add')))
>>> od = OptionDescription('root', '', [val1, val2, val3])
>>> cfg = Config(od)
>>> cfg.value.dict()
{'val1': 1, 'val2': 2, 'val3': 3}
* you want to add all values
>>> from tiramisu import calc_value, IntOption, OptionDescription, Config, Params, ParamOption, ParamValue
>>> val1 = IntOption('val1', "", 1)
>>> val2 = IntOption('val2', "", 2)
>>> val3 = IntOption('val3', "", callback=calc_value, callback_params=Params((ParamOption(val1), ParamOption(val2)), operator=ParamValue('add')))
>>> od = OptionDescription('root', '', [val1, val2, val3])
>>> cfg = Config(od)
>>> cfg.value.dict()
{'val1': 1, 'val2': 2, 'val3': 3}
"""
def value_from_kwargs(value: Any, pattern: str, to_dict: bool=False) -> Any:
"""
self.args = args
self.condition = condition
self.expected = expected
self.condition_operator = condition_operator
self.inverse_condition = inverse_condition
self.kwargs = kwargs
self.no_condition_is_invalid = no_condition_is_invalid
value = self.get_value(default,
min_args_len)
if not multi:
if join is not None:
value = join.join(value)
elif value and operator:
new_value = value[0]
op = {'mul': mul,
'add': add,
'div': truediv,
'sub': sub}[operator]
for val in value[1:]:
new_value = op(new_value, val)
value = new_value
elif value == []:
value = None
else:
value = value[0]
if isinstance(value, list) and index is not None:
if len(value) > index:
value = value[index]
else:
value = None
elif None in value and not allow_none:
value = []
elif remove_duplicate_value:
new_value = []
for val in value:
if val not in new_value:
new_value.append(val)
value = new_value
return value
def value_from_kwargs(self,
value: Any,
pattern: str,
to_dict: bool=False,
empty_test=undefined) -> Any:
# if value attribute exist return it's value
# otherwise pattern_0, pattern_1, ...
# otherwise undefined
if value is not undefined:
if value is not empty_test:
if to_dict == 'all':
returns = {0: value}
returns = {None: value}
else:
returns = value
else:
kwargs_matches = {}
len_pattern = len(pattern)
for key in kwargs.keys():
for key in self.kwargs.keys():
if key.startswith(pattern):
index = int(key[len_pattern:])
kwargs_matches[index] = kwargs[key]
pattern_value = self.kwargs[key]
if isinstance(pattern_value, dict):
pattern_value = pattern_value['value']
kwargs_matches[index] = pattern_value
if not kwargs_matches:
return undefined
keys = sorted(kwargs_matches)
if to_dict:
returns = {}
returns = undefined
else:
returns = []
for key in keys:
keys = sorted(kwargs_matches)
if to_dict:
returns[key] = kwargs_matches[key]
returns = {}
else:
returns.append(kwargs_matches[key])
returns = []
for key in keys:
if to_dict:
returns[key] = kwargs_matches[key]
else:
returns.append(kwargs_matches[key])
return returns
def is_condition_matches():
calculated_conditions = value_from_kwargs(condition, 'condition_', to_dict='all')
if condition is not undefined:
def is_condition_matches(self,
condition_value):
calculated_conditions = self.value_from_kwargs(condition_value,
'condition_',
to_dict='all')
if calculated_conditions is undefined:
is_matches = not self.no_condition_is_invalid
else:
is_matches = None
calculated_expected = value_from_kwargs(expected, 'expected_', to_dict=True)
calculated_expected = self.value_from_kwargs(self.expected,
'expected_',
to_dict=True)
calculated_inverse = self.value_from_kwargs(self.inverse_condition,
'inverse_condition_',
to_dict=True,
empty_test=False)
for idx, calculated_condition in calculated_conditions.items():
if isinstance(calculated_expected, dict):
current_matches = calculated_condition == calculated_expected[idx]
if idx is not None:
current_matches = calculated_condition == calculated_expected[idx]
else:
current_matches = calculated_condition in calculated_expected.values()
else:
current_matches = calculated_condition == calculated_expected
if isinstance(calculated_inverse, dict) and idx in calculated_inverse:
inverse_condition = calculated_inverse[idx]
else:
inverse_condition = False
if is_matches is None:
is_matches = current_matches
elif condition_operator == 'AND':
if self.condition_operator == 'AND':
is_matches = is_matches and current_matches
elif condition_operator == 'OR':
if inverse_condition:
is_matches = not is_matches
if not is_matches:
break
elif self.condition_operator == 'OR':
is_matches = is_matches or current_matches
if inverse_condition:
is_matches = not is_matches
if is_matches:
break
else:
raise ValueError(_('unexpected {} condition_operator in calc_value').format(condition_operator))
else:
is_matches = True
raise ValueError(_('unexpected {} condition_operator in calc_value').format(self.condition_operator))
is_matches = is_matches and not self.inverse_condition \
or not is_matches and self.inverse_condition
return is_matches
def get_value():
if not is_condition_matches():
def get_value(self,
default,
min_args_len):
if isinstance(self.condition, dict):
if 'value' in self.condition:
condition_value = self.condition['value']
else:
condition_value = undefined
else:
condition_value = self.condition
condition_matches = self.is_condition_matches(condition_value)
if not condition_matches:
# force to default
value = []
else:
value = list(args)
value = self.get_args()
if min_args_len and not len(value) >= min_args_len:
value = []
if value == []:
# default value
new_default = value_from_kwargs(default, 'default_')
new_default = self.value_from_kwargs(default,
'default_')
if new_default is not undefined:
if not isinstance(new_default, list):
value = [new_default]
@ -291,34 +320,72 @@ def calc_value(*args: List[Any],
value = new_default
return value
value = get_value()
if not multi:
if join is not None:
value = join.join(value)
elif value and operator:
new_value = value[0]
op = {'mul': mul,
'add': add,
'div': truediv,
'sub': sub}[operator]
for val in value[1:]:
new_value = op(new_value, val)
value = new_value
elif value == []:
value = None
def get_args(self):
return list(self.args)
class CalcValuePropertyHelp(CalcValue):
def get_name(self):
return self.condition['name']
def get_indexed_name(self, index):
return self.kwargs.get(f'condition_{index}')['name']
def has_condition_kwargs(self):
for condition in self.kwargs:
if condition.startswith('condition_'):
return True
return False
def build_arg(self, name, value):
#if isinstance(option, tuple):
# if not inverse:
# msg = _('the calculated value is {0}').format(display_value)
# else:
# msg = _('the calculated value is not {0}').format(display_value)
#else:
if not self.inverse_condition:
msg = _('the value of "{0}" is {1}').format(name, value)
else:
value = value[0]
if isinstance(value, list) and index is not None:
if len(value) > index:
value = value[index]
msg = _('the value of "{0}" is not {1}').format(name, value)
return msg
def get_args(self):
args = super().get_args()
if args:
if len(self.args) != 1:
raise ValueError(_('only one property is allowed for a calculation'))
action = args[0]
calculated_expected = self.value_from_kwargs(self.expected,
'expected_',
to_dict=True)
if self.condition is not undefined:
if 'propertyerror' in self.condition:
msg = self.condition['propertyerror']
else:
value = None
elif None in value and not allow_none:
value = []
elif remove_duplicate_value:
new_value = []
for val in value:
if val not in new_value:
new_value.append(val)
value = new_value
return value
name = self.get_name()
if isinstance(calculated_expected, dict):
calc_values = calculated_expected.values()
else:
calc_values = [calculated_expected]
display_value = display_list([str(val) for val in calc_values],
'or',
add_quote=True)
msg = self.build_arg(name, display_value)
elif self.has_condition_kwargs():
msgs = []
for key, value in calculated_expected.items():
name = self.get_indexed_name(key)
msgs.append(self.build_arg(name, f'"{value}"'))
msg = display_list(msgs, self.condition_operator.lower())
else:
return [f'"{action}"']
return [f'"{action}" ({msg})']
return
## calc_properties.setdefault(action, []).append(msg)
calc_value = CalcValue()
calc_value.__name__ = 'calc_value'
calc_value_property_help = CalcValuePropertyHelp()
calc_value_property_help.__name__ = 'calc_value_property_help'

View File

@ -29,7 +29,7 @@ from ..i18n import _
from ..setting import undefined, Settings
from ..value import Values
from ..error import ConfigError, display_list
from ..function import Params, ParamContext, ParamOption, ParamIndex
from ..autolib import Calculation, Params, ParamContext, ParamOption, ParamIndex
STATIC_TUPLE = frozenset()
@ -83,16 +83,15 @@ class Base:
requires = undefined
if properties is None:
properties = frozenset()
if isinstance(properties, tuple):
elif isinstance(properties, tuple):
properties = frozenset(properties)
if is_multi and 'empty' not in properties:
# if option is a multi, it cannot be "empty" (None not allowed in the list)
# "empty" is removed for follower's option
properties = properties | {'empty'}
if not isinstance(properties, frozenset):
raise TypeError(_('invalid properties type {0} for {1},'
' must be a frozenset').format(type(properties),
name))
assert isinstance(properties, frozenset), _('invalid properties type {0} for {1},'
' must be a frozenset').format(type(properties),
name)
self.validate_properties(name,
calc_properties,
properties)
@ -115,6 +114,17 @@ class Base:
raise ValueError(_('conflict: properties already set in requirement {0} for {1}'
'').format(display_list(set_forbidden_properties, add_quote=True),
name))
assert isinstance(properties, frozenset), _('invalid properties type {0} for {1},'
' must be a frozenset').format(type(properties),
name)
for prop in properties:
if not isinstance(prop, str):
if not isinstance(prop, Calculation):
raise ValueError(_('invalid property type {0} for {1}, must be a string or a Calculation').format(type(prop), name))
params = prop.params
for param in chain(params.args, params.kwargs.values()):
if isinstance(param, ParamOption):
param.option._add_dependency(self)
def _get_function_args(self,
function: Callable) -> Tuple[Set[str], Set[str], bool, bool]:
@ -159,7 +169,7 @@ class Base:
"""
:add_value: add value as first argument for validator
"""
assert isinstance(calculator, FunctionType), _('{0} must be a function').format(type_)
assert isinstance(calculator, Callable), _('{0} must be a function').format(type_)
if calculator_params is not None:
assert isinstance(calculator_params, Params), _('{0}_params must be a params'
'').format(type_)

View File

@ -32,7 +32,7 @@ from .syndynoptiondescription import SynDynLeadership
from .baseoption import BaseOption
from .option import Option
from ..error import RequirementError
from ..function import ParamOption
from ..autolib import ParamOption
class Leadership(OptionDescription):
@ -97,11 +97,8 @@ class Leadership(OptionDescription):
raise RequirementError(_('leader {} have requirement, but Leadership {} too'
'').format(leader.impl_getname(),
self.impl_getname()))
leader_calproperties = getattr(leader, '_calc_properties', None)
leader_calproperties = getattr(leader, '_requires', None)
if leader_calproperties:
if __debug__ and properties is not None:
self.validate_properties(name, leader_calproperties, frozenset(properties))
setattr(self, '_calc_properties', leader_calproperties)
setattr(self, '_requires', leader_requires)
delattr(leader, '_requires')
if __debug__:

View File

@ -26,10 +26,9 @@ from typing import Any, List, Callable, Optional, Dict, Union, Tuple
from .baseoption import BaseOption, submulti, STATIC_TUPLE
from ..i18n import _
from ..setting import undefined, OptionBag, Undefined
from ..autolib import carry_out_calculation
from ..autolib import carry_out_calculation, Params, ParamValue
from ..error import (ConfigError, ValueWarning, ValueErrorWarning, PropertiesOptionError,
ValueOptionError, display_list)
from ..function import Params, ParamValue
from .syndynoption import SynDynOption
ALLOWED_CONST_LIST = ['_cons_not_equal']

View File

@ -38,6 +38,9 @@ class SynDynOptionDescription:
subpath: str,
suffix: str) -> None:
self._opt = opt
if subpath is None:
subpath = ''
assert isinstance(subpath, str), 'subpath must be a string, not {}'.format(type(subpath))
self._subpath = subpath
self._suffix = suffix

View File

@ -415,13 +415,13 @@ class Settings(object):
search_properties=None):
"""
"""
opt = option_bag.option
# FIXME search_properties
option = option_bag.option
config_bag = option_bag.config_bag
path = option_bag.path
if option.impl_is_symlinkoption():
option = option.impl_getopt()
path = option.impl_getpath()
index = option_bag.index
if opt.impl_is_symlinkoption():
opt = opt.impl_getopt()
path = opt.impl_getpath()
if apply_requires:
cache = config_bag.context._impl_properties_cache
@ -435,13 +435,28 @@ class Settings(object):
else:
is_cached = False
if not is_cached:
props = self._p_.getproperties(path,
opt.impl_getproperties())
props = set()
for prop in self._p_.getproperties(path,
option.impl_getproperties()):
if isinstance(prop, str):
props.add(prop)
elif apply_requires:
new_props = prop.execute(option_bag,
leadership_must_have_index=True)
if not new_props:
continue
elif not isinstance(new_props, str):
raise ValueError(_('invalid property type {} for {} with {} function').format(type(new_props),
option_bag.option.impl_getname(),
prop.function.__name__))
props.add(new_props)
# else:
# props.update(new_props)
if apply_requires:
props |= self.apply_requires(option_bag,
False,
search_properties=search_properties)
props -= self.getpermissives(opt,
props -= self.getpermissives(option,
path)
#if apply_requires and config_bag.properties == config_bag.true_properties:
if apply_requires and not config_bag.is_unrestraint:
@ -453,6 +468,29 @@ class Settings(object):
True)
return props
def get_calculated_properties(self,
option_bag):
opt = option_bag.option
if opt.impl_is_symlinkoption():
opt = opt.impl_getopt()
path = opt.impl_getpath()
for prop in self._p_.getproperties(path,
opt.impl_getproperties()):
if not isinstance(prop, str):
yield prop
def has_properties_index(self,
option_bag):
opt = option_bag.option
if opt.impl_is_symlinkoption():
opt = opt.impl_getopt()
path = opt.impl_getpath()
for prop in self._p_.getproperties(path,
opt.impl_getproperties()):
if not isinstance(prop, str) and prop.has_index:
return True
return False
def get_context_permissives(self):
return self.getpermissives(None, None)
@ -670,7 +708,6 @@ class Settings(object):
"""save properties for specified path
(never save properties if same has option properties)
"""
# should have index !!!
opt = option_bag.option
if opt.impl_getrequires() is not None:
not_allowed_props = properties & \

View File

@ -31,7 +31,7 @@ from os.path import split
from typing import Dict
from ..error import ConfigError
from ..i18n import _
from .util import Cache
from .cacheobj import Cache
DEFAULT_STORAGE = MEMORY_STORAGE = 'dictionary'

View File

@ -1,5 +1,5 @@
# -*- coding: utf-8 -*-
"utils used by storage"
"cache used by storage"
# Copyright (C) 2013-2019 Team tiramisu (see AUTHORS for all contributors)
#
# This program is free software: you can redistribute it and/or modify it

View File

@ -15,10 +15,11 @@
# You should have received a copy of the GNU Lesser General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
# ____________________________________________________________
import sqlite3
import warnings
from ...i18n import _
from os.path import join
import sqlite3
from ...error import ConflictError
@ -55,9 +56,16 @@ def _gen_filename():
def list_sessions():
cursor = CONN.cursor()
names = [row[0] for row in cursor.execute("SELECT session FROM session").fetchall()]
return names
if not CONN:
warnings.warn_explicit(Warning(_('Cannot list sessions, please connect to database first')),
category=Warning,
filename=__file__,
lineno=63)
return []
else:
cursor = CONN.cursor()
names = [row[0] for row in cursor.execute("SELECT session FROM session").fetchall()]
return names
def delete_session(session_id,

View File

@ -19,8 +19,7 @@ import weakref
from typing import Optional, Any, Callable
from .error import ConfigError, PropertiesOptionError, RequirementError
from .setting import owners, undefined, forbidden_owners, OptionBag, ConfigBag
from .autolib import carry_out_calculation
from .function import Params
from .autolib import carry_out_calculation, Params
from .i18n import _