# -*- coding: utf-8 -*- "option types and option description" # Copyright (C) 2012-2017 Team tiramisu (see AUTHORS for all contributors) # # This program is free software: you can redistribute it and/or modify it # under the terms of the GNU Lesser General Public License as published by the # Free Software Foundation, either version 3 of the License, or (at your # option) any later version. # # This program is distributed in the hope that it will be useful, but WITHOUT # ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS # FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more # details. # # You should have received a copy of the GNU Lesser General Public License # along with this program. If not, see . # # The original `Config` design model is unproudly borrowed from # the rough pypy's guys: http://codespeak.net/svn/pypy/dist/pypy/config/ # the whole pypy projet is under MIT licence # ____________________________________________________________ import warnings import weakref from .baseoption import OnlyOption, submulti, validate_calculator, STATIC_TUPLE from .symlinkoption import DynSymLinkOption from ..i18n import _ from ..setting import log, undefined, debug from ..autolib import carry_out_calculation from ..error import (ConfigError, ValueWarning, PropertiesOptionError, display_list) from itertools import combinations ALLOWED_CONST_LIST = ['_cons_not_equal'] class Option(OnlyOption): """ Abstract base class for configuration option's. Reminder: an Option object is **not** a container for the value. """ __slots__ = ('_extra', '_warnings_only', '_allow_empty_list', #multi '_multi', '_unique', #value '_default', '_default_multi', #calcul '_val_call', # '_master_slaves', '_choice_values', '_choice_values_params', ) _empty = '' def __init__(self, name, doc, default=undefined, default_multi=None, requires=None, multi=False, unique=undefined, callback=None, callback_params=None, validator=None, validator_params=None, properties=None, warnings_only=False, extra=None, allow_empty_list=undefined): _setattr = object.__setattr__ if not multi and default_multi is not None: raise ValueError(_("default_multi is set whereas multi is False" " in option: {0}").format(name)) if default is undefined: if multi is False: default = None else: default = [] if multi is True: is_multi = True _multi = 0 elif multi is False: is_multi = False _multi = 1 elif multi is submulti: is_multi = True _multi = submulti else: raise ValueError(_('invalid multi value')) if _multi != 1: _setattr(self, '_multi', _multi) if multi is not False and default is None: default = [] super(Option, self).__init__(name, doc, requires=requires, properties=properties, is_multi=is_multi) if validator is not None: validate_calculator(validator, validator_params, 'validator', self) validator_params = self._build_calculator_params(validator, validator_params, add_value=True) if validator_params is None: val_call = (validator,) else: val_call = (validator, validator_params) self._val_call = (val_call, None) if extra is not None: _setattr(self, '_extra', extra) if unique != undefined and not isinstance(unique, bool): raise ValueError(_('unique must be a boolean')) if not is_multi and unique is True: raise ValueError(_('unique must be set only with multi value')) if warnings_only is True: _setattr(self, '_warnings_only', warnings_only) if allow_empty_list is not undefined: _setattr(self, '_allow_empty_list', allow_empty_list) if is_multi and default_multi is not None: def test_multi_value(value): try: self._validate(value, undefined) except ValueError as err: raise ValueError(_("invalid default_multi value {0} " "for option {1}: {2}").format(str(value), self.impl_getname(), str(err))) if _multi is submulti: if not isinstance(default_multi, list): raise ValueError(_("invalid default_multi value {0} " "for option {1}: must be a list for a submulti" "").format(str(default_multi), self.impl_getname())) for value in default_multi: test_multi_value(value) else: test_multi_value(default_multi) _setattr(self, '_default_multi', default_multi) if unique is not undefined: _setattr(self, '_unique', unique) self.impl_validate(default, is_multi=is_multi, config_bag=undefined) if (is_multi and default != []) or \ (not is_multi and default is not None): if is_multi: default = tuple(default) _setattr(self, '_default', default) self.impl_set_callback(callback, callback_params, _init=True) def impl_is_multi(self): return getattr(self, '_multi', 1) != 1 def _validate(self, *args, **kwargs): pass def impl_is_unique(self): return getattr(self, '_unique', False) def impl_get_validator(self): val = getattr(self, '_val_call', (None,))[0] if val is None: ret_val = (None, {}) elif len(val) == 1: ret_val = (val[0], {}) else: ret_val = val return ret_val def impl_validate(self, value, config_bag, context=undefined, force_index=None, current_opt=undefined, is_multi=None, check_error=True, multi=None): """ :param value: the option's value :param context: Config's context :type context: :class:`tiramisu.config.Config` :type validate: boolean :param force_index: if multi, value has to be a list not if force_index is not None :type force_index: integer """ if current_opt is undefined: current_opt = self if config_bag is not undefined and \ ((check_error is True and not 'validator' in config_bag.setting_properties) or \ (check_error is False and not 'warnings' in config_bag.setting_properties)): return def _is_not_unique(value): #FIXME pourquoi la longueur doit etre egal ??? if check_error and self.impl_is_unique() and len(set(value)) != len(value): for idx, val in enumerate(value): if val in value[idx+1:]: raise ValueError(_('invalid value "{}", this value is already in "{}"' '').format(val, self.impl_get_display_name())) def calculation_validator(val, _index): validator, validator_params = self.impl_get_validator() if validator is not None: if validator_params != {}: validator_params_ = {} for val_param, values in validator_params.items(): validator_params_[val_param] = values #inject value in calculation if '' in validator_params_: lst = list(validator_params_['']) lst.insert(0, val) validator_params_[''] = tuple(lst) else: validator_params_[''] = (val,) else: validator_params_ = {'': (val,)} # Raise ValueError if not valid carry_out_calculation(current_opt, context=context, callback=validator, callback_params=validator_params_, index=_index, orig_value=value, config_bag=config_bag, is_validator=True) def do_validation(_value, _index): if isinstance(_value, list): # pragma: no cover raise ValueError(_('invalid value "{}" for "{}" ' 'which must not be a list').format(_value, self.impl_get_display_name())) #FIXME a revoir ... if _value is not None: if check_error: # option validation if config_bag is undefined: setting_properties = None else: setting_properties = config_bag.setting_properties self._validate(_value, config_bag, current_opt) if ((check_error and not is_warnings_only) or (not check_error and is_warnings_only)): calculation_validator(_value, _index) self._second_level_validation(_value, is_warnings_only) if is_multi is None: is_multi = self.impl_is_multi() is_warnings_only = getattr(self, '_warnings_only', False) try: val = value if not is_multi: do_validation(val, None) elif force_index is not None: if self.impl_is_submulti(): _is_not_unique(value) for idx, val in enumerate(value): do_validation(val, force_index) else: if multi is not None and self.impl_is_unique() and value in multi: if not self.impl_is_submulti() and len(multi) - 1 >= force_index: lst = list(multi) lst.pop(force_index) else: lst = multi if value in lst: raise ValueError(_('invalid value "{}", this value is already' ' in "{}"').format(value, self.impl_get_display_name())) do_validation(val, force_index) elif not isinstance(value, list): raise ValueError(_('invalid value "{0}" for "{1}" which ' 'must be a list').format(value, self.impl_getname())) elif self.impl_is_submulti(): for idx, lval in enumerate(value): _is_not_unique(lval) if not isinstance(lval, list): raise ValueError(_('invalid value "{0}" for "{1}" ' 'which must be a list of list' '').format(lval, self.impl_getname())) for val in lval: do_validation(val, idx) else: _is_not_unique(value) for idx, val in enumerate(value): do_validation(val, idx) self._valid_consistency(current_opt, value, context, force_index, check_error, config_bag) except ValueError as err: if debug: # pragma: no cover log.debug('do_validation: value: {0}, index: {1}:' ' {2}'.format(val, _index, err), exc_info=True) if is_warnings_only: msg = _('attention, "{0}" could be an invalid {1} for "{2}"' '').format(val, self._display_name, self.impl_get_display_name()) else: msg = _('"{0}" is an invalid {1} for "{2}"' '').format(val, self._display_name, self.impl_get_display_name()) err_msg = '{0}'.format(err) if err_msg: msg += ', {}'.format(err_msg) if check_error: raise ValueError(msg) else: warnings.warn_explicit(ValueWarning(msg, self), ValueWarning, self.__class__.__name__, 0) def impl_is_dynsymlinkoption(self): return False def impl_is_master_slaves(self, type_='both'): """FIXME """ master_slaves = self.impl_get_master_slaves() if master_slaves is not None: if type_ in ('both', 'master') and \ master_slaves.is_master(self): return True if type_ in ('both', 'slave') and \ not master_slaves.is_master(self): return True return False def impl_get_master_slaves(self): masterslave = getattr(self, '_master_slaves', None) if masterslave is None: return masterslave return masterslave() def impl_getdoc(self): "accesses the Option's doc" return self.impl_get_information('doc') def _valid_consistencies(self, other_opts, init=True, func=None): if self._is_subdyn(): dynod = self._subdyn() else: dynod = None if self.impl_is_submulti(): raise ConfigError(_('cannot add consistency with submulti option')) is_multi = self.impl_is_multi() for opt in other_opts: if isinstance(opt, weakref.ReferenceType): opt = opt() if opt.impl_is_submulti(): raise ConfigError(_('cannot add consistency with submulti option')) if not isinstance(opt, Option): raise ConfigError(_('consistency must be set with an option, not {}').format(opt)) if opt._is_subdyn(): if dynod is None: raise ConfigError(_('almost one option in consistency is ' 'in a dynoptiondescription but not all')) subod = opt._subdyn() if dynod != subod: raise ConfigError(_('option in consistency must be in same' ' dynoptiondescription')) dynod = subod elif dynod is not None: raise ConfigError(_('almost one option in consistency is in a ' 'dynoptiondescription but not all')) if self is opt: raise ConfigError(_('cannot add consistency with itself')) if is_multi != opt.impl_is_multi(): raise ConfigError(_('every options in consistency must be ' 'multi or none')) if init: # FIXME if func != 'not_equal': opt._has_dependency = True def impl_add_consistency(self, func, *other_opts, **params): """Add consistency means that value will be validate with other_opts option's values. :param func: function's name :type func: `str` :param other_opts: options used to validate value :type other_opts: `list` of `tiramisu.option.Option` :param params: extra params (warnings_only and transitive are allowed) """ if self.impl_is_readonly(): raise AttributeError(_("'{0}' ({1}) cannot add consistency, option is" " read-only").format( self.__class__.__name__, self.impl_getname())) self._valid_consistencies(other_opts, func=func) func = '_cons_{0}'.format(func) if func not in dir(self): raise ConfigError(_('consistency {0} not available for this option').format(func)) options = [weakref.ref(self)] for option in other_opts: options.append(weakref.ref(option)) all_cons_opts = tuple(options) unknown_params = set(params.keys()) - set(['warnings_only', 'transitive']) if unknown_params != set(): raise ValueError(_('unknown parameter {0} in consistency').format(unknown_params)) self._add_consistency(func, all_cons_opts, params) #validate default value when add consistency #FIXME validation! self.impl_validate(self.impl_getdefault(), undefined) self.impl_validate(self.impl_getdefault(), undefined, check_error=False) #FIXME #if err: # self._del_consistency() if func != '_cons_not_equal': #consistency could generate warnings or errors self._has_dependency = True for wopt in all_cons_opts: opt = wopt() if func in ALLOWED_CONST_LIST: if getattr(opt, '_unique', undefined) == undefined: opt._unique = True if opt != self: self._add_dependency(opt) opt._add_dependency(self) def _valid_consistency(self, option, value, context, index, check_error, config_bag): if context is not undefined: descr = context.cfgimpl_get_description() # no consistency found at all if descr._cache_consistencies is None: return # get consistencies for this option if isinstance(option, DynSymLinkOption): consistencies = descr._cache_consistencies.get(option.impl_getopt()) else: consistencies = descr._cache_consistencies.get(option) else: # is no context, get consistencies in option consistencies = option._get_consistencies() if consistencies is not None: for cons_id, func, all_cons_opts, params in consistencies: warnings_only = params.get('warnings_only', False) if (warnings_only and not check_error) or (not warnings_only and check_error): transitive = params.get('transitive', True) #all_cons_opts[0] is the option where func is set if isinstance(option, DynSymLinkOption): opts = [] for opt in all_cons_opts: opts.append(DynSymLinkOption(opt(), option._rootpath, option._suffix)) wopt = opts[0] else: opts = all_cons_opts wopt = opts[0]() wopt._launch_consistency(self, func, cons_id, option, value, context, index, opts, warnings_only, transitive, config_bag) def _launch_consistency(self, current_opt, func, cons_id, option, value, context, index, opts, warnings_only, transitive, config_bag): """Launch consistency now :param func: function name, this name should start with _cons_ :type func: `str` :param option: option that value is changing :type option: `tiramisu.option.Option` :param value: new value of this opion :param context: Config's context, if None, check default value instead :type context: `tiramisu.config.Config` :param index: only for multi option, consistency should be launch for specified index :type index: `int` :param opts: all options concerne by this consistency :type opts: `list` of `tiramisu.option.Option` :param warnings_only: specific raise error for warning :type warnings_only: `boolean` :param transitive: propertyerror is transitive :type transitive: `boolean` """ if context is not undefined: descr = context.cfgimpl_get_description() if config_bag is not undefined and cons_id in config_bag.fromconsistency: return all_cons_vals = [] all_cons_opts = [] length = None for opt in opts: if isinstance(opt, weakref.ReferenceType): opt = opt() if option == opt: # option is current option # we have already value, so use it opt_value = value elif context is undefined: opt_value = opt.impl_getdefault() else: #if context, calculate value, otherwise get default value sconfig_bag = config_bag.copy('nooption') sconfig_bag.option = opt sconfig_bag.fromconsistency.append(cons_id) sconfig_bag.force_permissive = True path = opt.impl_getpath(context) if opt.impl_is_master_slaves('slave'): index_ = index else: index_ = None try: opt_value = context.getattr(path, index_, sconfig_bag, iter_slave=True) except PropertiesOptionError as err: if debug: # pragma: no cover log.debug('propertyerror in _launch_consistency: {0}'.format(err)) if transitive: err.set_orig_opt(option) raise err opt_value = None if not option == opt and opt_value is not None and index is not None and \ (context is undefined or \ not opt.impl_is_master_slaves('slave')): if len(opt_value) <= index: opt_value = opt.impl_getdefault_multi() else: opt_value = opt_value[index] if opt_value is not None and opt.impl_is_multi() and index is None and func not in ALLOWED_CONST_LIST: if length is not None and length != len(opt_value): if context is undefined: return raise ValueError(_('unexpected length of "{}" in constency "{}", should be "{}"' '').format(len(opt_value), opt.impl_get_display_name(), length)) else: length = len(opt_value) is_multi = True else: is_multi = False if isinstance(opt_value, list) and func in ALLOWED_CONST_LIST: for value_ in opt_value: if isinstance(value_, list): for val in value_: all_cons_vals.append((False, val)) all_cons_opts.append(opt) else: all_cons_vals.append((False, value_)) all_cons_opts.append(opt) else: all_cons_vals.append((is_multi, opt_value)) all_cons_opts.append(opt) else: try: all_values = [] if length is None: all_value = [] for is_multi, values in all_cons_vals: all_value.append(values) all_values = [all_value] else: for idx in range(length): all_value = [] for is_multi, values in all_cons_vals: if not is_multi: all_value.append(values) else: all_value.append(values[idx]) all_values.append(all_value) for values in all_values: getattr(self, func)(current_opt, all_cons_opts, values, warnings_only) except ValueError as err: if warnings_only: msg = _('attention, "{0}" could be an invalid {1} for "{2}", {3}' '').format(value, self._display_name, current_opt.impl_get_display_name(), err) warnings.warn_explicit(ValueWarning(msg, self), ValueWarning, self.__class__.__name__, 0) else: raise err def _cons_not_equal(self, current_opt, opts, vals, warnings_only): equal = [] is_current = False for idx_inf, val_inf in enumerate(vals): for idx_sup, val_sup in enumerate(vals[idx_inf + 1:]): if val_inf == val_sup is not None: for opt_ in [opts[idx_inf], opts[idx_inf + idx_sup + 1]]: if opt_ == current_opt: is_current = True else: if opt_ not in equal: equal.append(opt_) if equal: if debug: # pragma: no cover log.debug(_('_cons_not_equal: {} are not different').format(display_list(equal))) if is_current: if warnings_only: msg = _('should be different from the value of {}') else: msg = _('must be different from the value of {}') else: if warnings_only: msg = _('value for {} should be different') else: msg = _('value for {} must be different') equal_name = [] for opt in equal: equal_name.append(opt.impl_get_display_name()) raise ValueError(msg.format(display_list(list(equal_name)))) def _second_level_validation(self, value, warnings_only): pass def impl_getdefault_multi(self): "accessing the default value for a multi" if self.impl_is_submulti(): default_value = [] else: default_value = None return getattr(self, '_default_multi', default_value) def _validate_calculator(self, callback, callback_params): """callback_params: * None * {'': ((option, permissive),), 'ip': ((None,), (option, permissive)) """ if callback is None: return default_multi = getattr(self, '_default_multi', None) is_multi = self.impl_is_multi() default = self.impl_getdefault() if (not is_multi and (default is not None or default_multi is not None)) or \ (is_multi and (default != [] or default_multi is not None)): raise ValueError(_('default value not allowed if option "{0}" ' 'is calculated').format(self.impl_getname())) def impl_getdefault(self): "accessing the default value" is_multi = self.impl_is_multi() default = getattr(self, '_default', undefined) if default is undefined: if is_multi: default = [] else: default = None else: if is_multi: default = list(default) return default def _get_extra(self, key): extra = self._extra if isinstance(extra, tuple): return extra[1][extra[0].index(key)] else: return extra[key] def impl_is_submulti(self): return getattr(self, '_multi', 1) == 2 def impl_allow_empty_list(self): return getattr(self, '_allow_empty_list', undefined) #____________________________________________________________ # consistency def _add_consistency(self, func, all_cons_opts, params): cons = (None, func, all_cons_opts, params) consistencies = getattr(self, '_consistencies', None) if consistencies is None: self._consistencies = [cons] else: consistencies.append(cons) def _del_consistency(self): self._consistencies.pop(-1) def _get_consistencies(self): return getattr(self, '_consistencies', STATIC_TUPLE) def _has_consistencies(self, context): if context is undefined: return False descr = context.cfgimpl_get_description() if descr._cache_consistencies is None: return False return self in descr._cache_consistencies class RegexpOption(Option): __slots__ = tuple() def _validate(self, value, *args, **kwargs): err = self._impl_valid_string(value) if err: return err match = self._regexp.search(value) if not match: raise ValueError()