simplify consistency validation

reorder function (logical order)
remove duplicate arity (context, config_bag, option_bag, ...)
master cannot have length lower than slave length
This commit is contained in:
Emmanuel Garette 2018-09-16 11:40:54 +02:00
parent 1d6e0c0dcd
commit b31a94e449
2 changed files with 317 additions and 337 deletions

View File

@ -21,6 +21,7 @@
# ____________________________________________________________
import warnings
import weakref
from typing import Any, List, Callable, Optional
from .baseoption import OnlyOption, submulti, STATIC_TUPLE
from ..i18n import _
@ -197,7 +198,6 @@ class Option(OnlyOption):
# just to check propertieserror
self.valid_consistency(option_bag,
value,
context,
check_error,
is_warnings_only)
return
@ -302,7 +302,6 @@ class Option(OnlyOption):
if not is_warnings_only or not check_error:
self.valid_consistency(option_bag,
value,
context,
check_error,
is_warnings_only)
except ValueError as err:
@ -346,338 +345,6 @@ class Option(OnlyOption):
"accesses the Option's doc"
return self.impl_get_information('doc')
def _valid_consistencies(self,
other_opts,
init=True,
func=None):
if self.issubdyn():
dynod = self.getsubdyn()
else:
dynod = None
if self.impl_is_submulti():
raise ConfigError(_('cannot add consistency with submulti option'))
is_multi = self.impl_is_multi()
for opt in other_opts:
if isinstance(opt, weakref.ReferenceType):
opt = opt()
if opt.impl_is_submulti(): # pragma: no cover
raise ConfigError(_('cannot add consistency with submulti option'))
if not isinstance(opt, Option): # pragma: no cover
raise ConfigError(_('consistency must be set with an option, not {}').format(opt))
if opt.issubdyn():
if dynod is None:
raise ConfigError(_('almost one option in consistency is '
'in a dynoptiondescription but not all'))
subod = opt.getsubdyn()
if dynod != subod:
raise ConfigError(_('option in consistency must be in same'
' dynoptiondescription'))
dynod = subod
elif dynod is not None:
raise ConfigError(_('almost one option in consistency is in a '
'dynoptiondescription but not all'))
if self is opt:
raise ConfigError(_('cannot add consistency with itself'))
if is_multi != opt.impl_is_multi():
raise ConfigError(_('every options in consistency must be '
'multi or none'))
if init:
# FIXME
if func != 'not_equal':
opt._has_dependency = True
def impl_add_consistency(self,
func,
*other_opts,
**params):
"""Add consistency means that value will be validate with other_opts
option's values.
:param func: function's name
:type func: `str`
:param other_opts: options used to validate value
:type other_opts: `list` of `tiramisu.option.Option`
:param params: extra params (warnings_only and transitive are allowed)
"""
if self.impl_is_readonly():
raise AttributeError(_("'{0}' ({1}) cannot add consistency, option is"
" read-only").format(
self.__class__.__name__,
self.impl_getname()))
self._valid_consistencies(other_opts,
func=func)
func = '_cons_{0}'.format(func)
if func not in dir(self):
raise ConfigError(_('consistency {0} not available for this option').format(func))
options = [weakref.ref(self)]
for option in other_opts:
options.append(weakref.ref(option))
all_cons_opts = tuple(options)
unknown_params = set(params.keys()) - set(['warnings_only', 'transitive'])
if unknown_params != set():
raise ValueError(_('unknown parameter {0} in consistency').format(unknown_params))
self._add_consistency(func,
all_cons_opts,
params)
#validate default value when add consistency
option_bag = OptionBag()
option_bag.set_option(self,
undefined,
None,
undefined)
self.impl_validate(self.impl_getdefault(),
option_bag)
self.impl_validate(self.impl_getdefault(),
option_bag,
check_error=False)
if func != '_cons_not_equal':
#consistency could generate warnings or errors
self._has_dependency = True
for wopt in all_cons_opts:
opt = wopt()
if func in ALLOWED_CONST_LIST:
if getattr(opt, '_unique', undefined) == undefined:
opt._unique = True
if opt != self:
self._add_dependency(opt)
opt._add_dependency(self)
def valid_consistency(self,
option_bag,
value,
context,
check_error,
option_warnings_only):
if context is not undefined:
descr = context.cfgimpl_get_description()
# no consistency found at all
if descr._cache_consistencies is None:
return
# get consistencies for this option
if option_bag.option.impl_is_dynsymlinkoption():
consistencies = descr._cache_consistencies.get(option_bag.option.impl_getopt())
else:
consistencies = descr._cache_consistencies.get(option_bag.option)
else:
# is no context, get consistencies in option
consistencies = option_bag.option.get_consistencies()
if consistencies:
if option_bag.config_bag is undefined:
cconfig_bag = undefined
else:
cconfig_bag = option_bag.config_bag.copy()
cconfig_bag.properties = cconfig_bag.properties - {'warnings'}
cconfig_bag.set_permissive()
if not option_bag.fromconsistency:
fromconsistency_is_empty = True
option_bag.fromconsistency = [cons_id for cons_id, f, a, p in consistencies]
else:
fromconsistency_is_empty = False
for cons_id, func, all_cons_opts, params in consistencies:
if not fromconsistency_is_empty and cons_id in option_bag.fromconsistency:
return
warnings_only = option_warnings_only or params.get('warnings_only', False)
if (warnings_only and not check_error) or (not warnings_only and check_error):
transitive = params.get('transitive', True)
#all_cons_opts[0] is the option where func is set
if option_bag.ori_option.impl_is_dynsymlinkoption():
opts = []
for opt in all_cons_opts:
opts.append(opt().impl_get_dynoption(option_bag.ori_option._rootpath,
option_bag.ori_option._suffix))
wopt = opts[0]
else:
opts = all_cons_opts
wopt = opts[0]()
wopt.launch_consistency(self,
func,
cons_id,
option_bag,
value,
context,
opts,
warnings_only,
transitive,
cconfig_bag)
if fromconsistency_is_empty:
option_bag.fromconsistency = []
def _get_consistency_value(self,
index,
current_option,
fromoption,
fromconsistency,
cons_id,
context,
config_bag,
value,
func):
if func in ALLOWED_CONST_LIST:
index = None
index_ = None
elif not current_option.impl_is_master_slaves('slave'):
index_ = None
else:
index_ = index
if fromoption == current_option:
# orig_option is current option
# we have already value, so use it
return value
if context is undefined:
#if no context get default value
return current_option.impl_getdefault()
#otherwise calculate value
path = current_option.impl_getpath()
coption_bag = OptionBag()
coption_bag.set_option(current_option,
path,
index_,
config_bag)
fromconsistency.append(cons_id)
coption_bag.fromconsistency = fromconsistency
opt_value = context.getattr(path,
coption_bag)
if index_ is None and index is not None:
if len(opt_value) <= index:
opt_value = current_option.impl_getdefault_multi()
else:
opt_value = opt_value[index]
return opt_value
def launch_consistency(self,
current_opt,
func,
cons_id,
option_bag,
value,
context,
opts,
warnings_only,
transitive,
config_bag):
"""Launch consistency now
:param func: function name, this name should start with _cons_
:type func: `str`
:param option: option that value is changing
:type option: `tiramisu.option.Option`
:param value: new value of this opion
:param context: Config's context, if None, check default value instead
:type context: `tiramisu.config.Config`
:param index: only for multi option, consistency should be launch for
specified index
:type index: `int`
:param opts: all options concerne by this consistency
:type opts: `list` of `tiramisu.option.Option`
:param warnings_only: specific raise error for warning
:type warnings_only: `boolean`
:param transitive: propertyerror is transitive
:type transitive: `boolean`
"""
all_cons_vals = []
all_cons_opts = []
length = None
for opt in opts:
if isinstance(opt, weakref.ReferenceType):
opt = opt()
try:
opt_value = self._get_consistency_value(option_bag.index,
opt,
option_bag.ori_option,
option_bag.fromconsistency.copy(),
cons_id,
context,
config_bag,
value,
func)
except PropertiesOptionError as err:
if debug: # pragma: no cover
log.debug('propertyerror in launch_consistency: {0}'.format(err))
if transitive:
err.set_orig_opt(option_bag.option)
raise err
else:
if opt.impl_is_multi() and option_bag.index is None and \
func not in ALLOWED_CONST_LIST:
len_value = len(opt_value)
if length is not None and length != len_value:
if context is undefined:
return
raise ValueError(_('unexpected length of "{}" in constency "{}", '
'should be "{}"').format(len(opt_value),
opt.impl_get_display_name(),
length)) # pragma: no cover
length = len_value
if isinstance(opt_value, list) and func in ALLOWED_CONST_LIST:
for value_ in opt_value:
all_cons_vals.append(value_)
all_cons_opts.append(opt)
else:
all_cons_vals.append(opt_value)
all_cons_opts.append(opt)
if config_bag is not undefined and not 'validator' in config_bag.properties:
return
all_values = []
if length is None:
all_values = [all_cons_vals]
elif length:
all_values = zip(*all_cons_vals)
try:
for values in all_values:
getattr(self, func)(current_opt,
all_cons_opts,
values,
warnings_only,
context)
except ValueError as err:
if warnings_only:
msg = _('attention, "{0}" could be an invalid {1} for "{2}", {3}'
'').format(value,
self._display_name,
current_opt.impl_get_display_name(),
err)
warnings.warn_explicit(ValueWarning(msg, weakref.ref(self)),
ValueWarning,
self.__class__.__name__, 0)
else:
raise err
def _cons_not_equal(self,
current_opt,
opts,
vals,
warnings_only,
context):
equal = []
is_current = False
for idx_inf, val_inf in enumerate(vals):
for idx_sup, val_sup in enumerate(vals[idx_inf + 1:]):
if val_inf == val_sup is not None:
for opt_ in [opts[idx_inf], opts[idx_inf + idx_sup + 1]]:
if opt_ == current_opt:
is_current = True
else:
if opt_ not in equal:
equal.append(opt_)
if equal:
if debug: # pragma: no cover
log.debug(_('_cons_not_equal: {} are not different').format(display_list(equal)))
if is_current:
if warnings_only:
msg = _('should be different from the value of "{}"')
else:
msg = _('must be different from the value of "{}"')
else:
if warnings_only:
msg = _('value for {} should be different')
else:
msg = _('value for {} must be different')
equal_name = []
for opt in equal:
equal_name.append(opt.impl_get_display_name())
raise ValueError(msg.format(display_list(list(equal_name))))
def _second_level_validation(self,
value,
warnings_only):
@ -737,14 +404,110 @@ class Option(OnlyOption):
def impl_allow_empty_list(self):
return getattr(self, '_allow_empty_list', undefined)
#____________________________________________________________
# consistency
# consistencies
def impl_add_consistency(self,
func,
*other_opts,
**params):
"""Add consistency means that value will be validate with other_opts
option's values.
:param func: function's name
:type func: `str`
:param other_opts: options used to validate value
:type other_opts: `list` of `tiramisu.option.Option`
:param params: extra params (warnings_only and transitive are allowed)
"""
if self.impl_is_readonly():
raise AttributeError(_("'{0}' ({1}) cannot add consistency, option is"
" read-only").format(
self.__class__.__name__,
self.impl_getname()))
self._valid_consistencies(other_opts,
func=func)
func = '_cons_{0}'.format(func)
if func not in dir(self):
raise ConfigError(_('consistency {0} not available for this option').format(func))
options = [weakref.ref(self)]
for option in other_opts:
options.append(weakref.ref(option))
all_cons_opts = tuple(options)
unknown_params = set(params.keys()) - set(['warnings_only', 'transitive'])
if unknown_params != set():
raise ValueError(_('unknown parameter {0} in consistency').format(unknown_params))
self._add_consistency(func,
all_cons_opts,
params)
#validate default value when add consistency
option_bag = OptionBag()
option_bag.set_option(self,
undefined,
None,
undefined)
self.impl_validate(self.impl_getdefault(),
option_bag)
self.impl_validate(self.impl_getdefault(),
option_bag,
check_error=False)
if func != '_cons_not_equal':
#consistency could generate warnings or errors
self._has_dependency = True
for wopt in all_cons_opts:
opt = wopt()
if func in ALLOWED_CONST_LIST:
if getattr(opt, '_unique', undefined) == undefined:
opt._unique = True
if opt != self:
self._add_dependency(opt)
opt._add_dependency(self)
def _valid_consistencies(self,
other_opts,
init=True,
func=None):
if self.issubdyn():
dynod = self.getsubdyn()
else:
dynod = None
if self.impl_is_submulti():
raise ConfigError(_('cannot add consistency with submulti option'))
is_multi = self.impl_is_multi()
for opt in other_opts:
if isinstance(opt, weakref.ReferenceType):
opt = opt()
if opt.impl_is_submulti(): # pragma: no cover
raise ConfigError(_('cannot add consistency with submulti option'))
if not isinstance(opt, Option): # pragma: no cover
raise ConfigError(_('consistency must be set with an option, not {}').format(opt))
if opt.issubdyn():
if dynod is None:
raise ConfigError(_('almost one option in consistency is '
'in a dynoptiondescription but not all'))
subod = opt.getsubdyn()
if dynod != subod:
raise ConfigError(_('option in consistency must be in same'
' dynoptiondescription'))
dynod = subod
elif dynod is not None:
raise ConfigError(_('almost one option in consistency is in a '
'dynoptiondescription but not all'))
if self is opt:
raise ConfigError(_('cannot add consistency with itself'))
if is_multi != opt.impl_is_multi():
raise ConfigError(_('every options in consistency must be '
'multi or none'))
if init:
# FIXME
if func != 'not_equal':
opt._has_dependency = True
def _add_consistency(self,
func,
all_cons_opts,
params):
cons = (None, func, all_cons_opts, params)
cons = (-1, func, all_cons_opts, params)
consistencies = getattr(self, '_consistencies', None)
if consistencies is None:
self._consistencies = [cons]
@ -760,6 +523,218 @@ class Option(OnlyOption):
return False
return self in descr._cache_consistencies
def valid_consistency(self,
option_bag,
value,
check_error,
option_warnings_only):
if option_bag.config_bag is not undefined:
descr = option_bag.config_bag.context.cfgimpl_get_description()
# no consistency found at all
if descr._cache_consistencies is None:
return
# get consistencies for this option
if option_bag.option.impl_is_dynsymlinkoption():
consistencies = descr._cache_consistencies.get(option_bag.option.impl_getopt())
else:
consistencies = descr._cache_consistencies.get(option_bag.option)
else:
# is no context, get consistencies in option
consistencies = option_bag.option.get_consistencies()
if consistencies:
if option_bag.config_bag is undefined:
coption_bag = option_bag.copy()
else:
cconfig_bag = option_bag.config_bag.copy()
cconfig_bag.remove_warnings()
cconfig_bag.set_permissive()
coption_bag = option_bag.copy()
coption_bag.config_bag = cconfig_bag
if not option_bag.fromconsistency:
fromconsistency_is_empty = True
option_bag.fromconsistency = [cons_id for cons_id, f, a, p in consistencies]
else:
fromconsistency_is_empty = False
for cons_id, func, all_cons_opts, params in consistencies:
if not fromconsistency_is_empty and cons_id in option_bag.fromconsistency:
continue
warnings_only = option_warnings_only or params.get('warnings_only', False)
if (warnings_only and not check_error) or (not warnings_only and check_error):
transitive = params.get('transitive', True)
#all_cons_opts[0] is the option where func is set
if option_bag.ori_option.impl_is_dynsymlinkoption():
opts = []
for opt in all_cons_opts:
opts.append(opt().impl_get_dynoption(option_bag.ori_option._rootpath,
option_bag.ori_option._suffix))
wopt = opts[0]
else:
opts = all_cons_opts
wopt = opts[0]()
wopt.launch_consistency(self,
func,
cons_id,
coption_bag,
value,
opts,
warnings_only,
transitive)
if fromconsistency_is_empty:
option_bag.fromconsistency = []
def launch_consistency(self,
current_opt: OnlyOption,
func: Callable,
cons_id: int,
option_bag: OptionBag,
value: Any,
opts: List[OnlyOption],
warnings_only: bool,
transitive: bool):
"""Launch consistency now
"""
all_cons_vals = []
all_cons_opts = []
length = None
for opt in opts:
if isinstance(opt, weakref.ReferenceType):
opt = opt()
try:
opt_value = self.get_consistency_value(option_bag,
opt,
cons_id,
value,
func)
except PropertiesOptionError as err:
if debug: # pragma: no cover
log.debug('propertyerror in launch_consistency: {0}'.format(err))
if transitive:
err.set_orig_opt(option_bag.option)
raise err
else:
if opt.impl_is_multi() and option_bag.index is None and \
func not in ALLOWED_CONST_LIST:
len_value = len(opt_value)
if length is not None and length != len_value:
if option_bag.config_bag is undefined:
return
raise ValueError(_('unexpected length of "{}" in constency "{}", '
'should be "{}"').format(len(opt_value),
opt.impl_get_display_name(),
length)) # pragma: no cover
length = len_value
if isinstance(opt_value, list) and func in ALLOWED_CONST_LIST:
for value_ in opt_value:
all_cons_vals.append(value_)
all_cons_opts.append(opt)
else:
all_cons_vals.append(opt_value)
all_cons_opts.append(opt)
if option_bag.config_bag is not undefined and \
not 'validator' in option_bag.config_bag.properties:
return
all_values = []
if length is None:
all_values = [all_cons_vals]
elif length:
all_values = zip(*all_cons_vals)
try:
context = option_bag.config_bag if option_bag.config_bag is undefined else option_bag.config_bag.context
for values in all_values:
getattr(self, func)(current_opt,
all_cons_opts,
values,
warnings_only,
context)
except ValueError as err:
if warnings_only:
msg = _('attention, "{0}" could be an invalid {1} for "{2}", {3}'
'').format(value,
self._display_name,
current_opt.impl_get_display_name(),
err)
warnings.warn_explicit(ValueWarning(msg, weakref.ref(self)),
ValueWarning,
self.__class__.__name__, 0)
else:
raise err
def get_consistency_value(self,
option_bag,
current_option,
cons_id,
value,
func):
if func in ALLOWED_CONST_LIST:
index = None
index_ = None
elif current_option.impl_is_master_slaves('master'):
index = option_bag.index
index_ = None
else:
index = option_bag.index
index_ = index
if option_bag.ori_option == current_option:
# orig_option is current option
# we have already value, so use it
return value
if option_bag.config_bag is undefined:
#if no context get default value
return current_option.impl_getdefault()
#otherwise calculate value
path = current_option.impl_getpath()
coption_bag = OptionBag()
coption_bag.set_option(current_option,
path,
index_,
option_bag.config_bag)
fromconsistency = option_bag.fromconsistency.copy()
fromconsistency.append(cons_id)
coption_bag.fromconsistency = fromconsistency
current_value = option_bag.config_bag.context.getattr(path,
coption_bag)
if index_ is None and index is not None:
#if self is a slave and current_option is a master and func not in ALLOWED_CONST_LIST
#return only the value of the master for isolate slave
current_value = current_value[index]
return current_value
def _cons_not_equal(self,
current_opt,
opts,
vals,
warnings_only,
context):
equal = []
is_current = False
for idx_inf, val_inf in enumerate(vals):
for idx_sup, val_sup in enumerate(vals[idx_inf + 1:]):
if val_inf == val_sup is not None:
for opt_ in [opts[idx_inf], opts[idx_inf + idx_sup + 1]]:
if opt_ == current_opt:
is_current = True
else:
if opt_ not in equal:
equal.append(opt_)
if equal:
if debug: # pragma: no cover
log.debug(_('_cons_not_equal: {} are not different').format(display_list(equal)))
if is_current:
if warnings_only:
msg = _('should be different from the value of "{}"')
else:
msg = _('must be different from the value of "{}"')
else:
if warnings_only:
msg = _('value for {} should be different')
else:
msg = _('value for {} must be different')
equal_name = []
for opt in equal:
equal_name.append(opt.impl_get_display_name())
raise ValueError(msg.format(display_list(list(equal_name))))
class RegexpOption(Option):
__slots__ = tuple()

View File

@ -165,6 +165,8 @@ class OptionBag:
kwargs = {}
option_bag = OptionBag()
for key in self.__slots__:
if key == 'properties' and self.config_bag is undefined:
continue
setattr(option_bag, key, getattr(self, key))
return option_bag
@ -188,6 +190,9 @@ class ConfigBag:
return self.permissives
raise KeyError('unknown key {} for ConfigBag'.format(key)) # pragma: no cover
def remove_warnings(self):
self.properties = frozenset(self.properties - {'warnings'})
def remove_validation(self):
self.properties = frozenset(self.properties - {'validator'})