# -*- coding: utf-8 -*- "option types and option description" # Copyright (C) 2012-2017 Team tiramisu (see AUTHORS for all contributors) # # This program is free software: you can redistribute it and/or modify it # under the terms of the GNU Lesser General Public License as published by the # Free Software Foundation, either version 3 of the License, or (at your # option) any later version. # # This program is distributed in the hope that it will be useful, but WITHOUT # ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS # FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more # details. # # You should have received a copy of the GNU Lesser General Public License # along with this program. If not, see . # # The original `Config` design model is unproudly borrowed from # the rough pypy's guys: http://codespeak.net/svn/pypy/dist/pypy/config/ # the whole pypy projet is under MIT licence # ____________________________________________________________ import warnings import sys import weakref from .baseoption import OnlyOption, submulti, DynSymLinkOption, validate_callback, STATIC_TUPLE from ..i18n import _ from ..setting import log, undefined, debug from ..autolib import carry_out_calculation from ..error import (ConfigError, ValueWarning, PropertiesOptionError, display_list) from itertools import combinations ALLOWED_CONST_LIST = ['_cons_not_equal'] if sys.version_info[0] >= 3: # pragma: no cover xrange = range class Option(OnlyOption): """ Abstract base class for configuration option's. Reminder: an Option object is **not** a container for the value. """ __slots__ = ('_extra', '_warnings_only', '_allow_empty_list', #multi '_multi', '_unique', #value '_default', '_default_multi', #calcul '_val_call', # '_master_slaves', '_choice_values', '_choice_values_params', ) _empty = '' def __init__(self, name, doc, default=undefined, default_multi=None, requires=None, multi=False, unique=undefined, callback=None, callback_params=None, validator=None, validator_params=None, properties=None, warnings_only=False, extra=None, allow_empty_list=undefined): _setattr = object.__setattr__ if not multi and default_multi is not None: raise ValueError(_("default_multi is set whereas multi is False" " in option: {0}").format(name)) if default is undefined: if multi is False: default = None else: default = [] if multi is True: is_multi = True _multi = 0 elif multi is False: is_multi = False _multi = 1 elif multi is submulti: is_multi = True _multi = submulti else: raise ValueError(_('invalid multi value')) if _multi != 1: _setattr(self, '_multi', _multi) if multi is not False and default is None: default = [] if validator is not None: if multi: # and validator_params is None: validator_params = self._build_validator_params(validator, validator_params) validate_callback(validator, validator_params, 'validator', self) if validator_params is None: val_call = (validator,) else: val_call = (validator, validator_params) self._val_call = (val_call, None) if extra is not None: _setattr(self, '_extra', extra) if unique != undefined and not isinstance(unique, bool): raise ValueError(_('unique must be a boolean')) if not is_multi and unique is True: raise ValueError(_('unique must be set only with multi value')) if warnings_only is True: _setattr(self, '_warnings_only', warnings_only) if allow_empty_list is not undefined: _setattr(self, '_allow_empty_list', allow_empty_list) super(Option, self).__init__(name, doc, requires=requires, properties=properties, is_multi=is_multi) if is_multi and default_multi is not None: def test_multi_value(value): err = self._validate(value) if err: raise ValueError(_("invalid default_multi value {0} " "for option {1}: {2}").format( str(value), self.impl_getname(), str(err))) if _multi is submulti: if not isinstance(default_multi, list): raise ValueError(_("invalid default_multi value {0} " "for option {1}: must be a list for a submulti").format( str(default_multi), self.impl_getname())) for value in default_multi: test_multi_value(value) else: test_multi_value(default_multi) _setattr(self, '_default_multi', default_multi) if unique is not undefined: _setattr(self, '_unique', unique) err = self.impl_validate(default, is_multi=is_multi) if err: raise err if (is_multi and default != []) or \ (not is_multi and default is not None): if is_multi: default = tuple(default) _setattr(self, '_default', default) self.impl_set_callback(callback, callback_params, _init=True) def impl_is_multi(self): return getattr(self, '_multi', 1) != 1 def _launch_consistency(self, current_opt, func, option, value, context, index, opts, warnings_only, transitive, setting_properties): """Launch consistency now :param func: function name, this name should start with _cons_ :type func: `str` :param option: option that value is changing :type option: `tiramisu.option.Option` :param value: new value of this opion :param context: Config's context, if None, check default value instead :type context: `tiramisu.config.Config` :param index: only for multi option, consistency should be launch for specified index :type index: `int` :param opts: all options concerne by this consistency :type opts: `list` of `tiramisu.option.Option` :param warnings_only: specific raise error for warning :type warnings_only: `boolean` :param transitive: propertyerror is transitive :type transitive: `boolean` """ if context is not undefined: descr = context.cfgimpl_get_description() all_cons_vals = [] all_cons_opts = [] val_consistencies = True for wopt in opts: opt = wopt() if (isinstance(opt, DynSymLinkOption) and option._dyn == opt._dyn) or \ option == opt: # option is current option # we have already value, so use it all_cons_vals.append(value) all_cons_opts.append(opt) else: path = None is_multi = opt.impl_is_multi() and not opt.impl_is_master_slaves() #if context, calculate value, otherwise get default value if context is not undefined: if isinstance(opt, DynSymLinkOption): path = opt.impl_getpath(context) else: path = descr.impl_get_path_by_opt(opt) if is_multi: _index = None else: _index = index try: opt_value = context.getattr(path, setting_properties, validate=False, index=_index, force_permissive=True) except PropertiesOptionError as err: if debug: # pragma: no cover log.debug('propertyerror in _launch_consistency: {0}'.format(err)) if transitive: err.set_orig_opt(option) raise err else: opt_value = None #elif index is None: else: opt_value = opt.impl_getdefault() if index is not None: if len(opt_value) >= index: opt_value = opt.impl_getdefault_multi() else: opt_value = opt_value[index] if self.impl_is_multi() and index is None: # only check propertyerror for master/slaves is transitive val_consistencies = False if is_multi and isinstance(opt_value, list): all_cons_vals.extend(opt_value) for len_ in xrange(len(opt_value)): all_cons_opts.append(opt) else: all_cons_vals.append(opt_value) all_cons_opts.append(opt) if val_consistencies: try: getattr(self, func)(current_opt, all_cons_opts, all_cons_vals, warnings_only) except ValueError as err: if warnings_only: msg = _('attention, "{0}" could be an invalid {1} for "{2}", {3}' '').format(value, self._display_name, current_opt.impl_get_display_name(), err) warnings.warn_explicit(ValueWarning(msg, self), ValueWarning, self.__class__.__name__, 0) else: raise err def impl_is_unique(self): return getattr(self, '_unique', False) def impl_get_validator(self): val = getattr(self, '_val_call', (None,))[0] if val is None: ret_val = (None, {}) elif len(val) == 1: ret_val = (val[0], {}) else: ret_val = val return ret_val def impl_validate(self, value, context=undefined, validate=True, force_index=None, current_opt=undefined, is_multi=None, display_error=True, display_warnings=True, multi=None, setting_properties=undefined): """ :param value: the option's value :param context: Config's context :type context: :class:`tiramisu.config.Config` :param validate: if true enables ``self._validator`` validation :type validate: boolean :param force_index: if multi, value has to be a list not if force_index is not None :type force_index: integer """ if not validate: return if current_opt is undefined: current_opt = self if display_warnings and setting_properties is undefined and context is not undefined: setting_properties = context.cfgimpl_get_settings()._getproperties(read_write=False) display_warnings = display_warnings and (setting_properties is undefined or 'warnings' in setting_properties) def _is_not_unique(value): if display_error and self.impl_is_unique() and len(set(value)) != len(value): for idx, val in enumerate(value): if val in value[idx+1:]: raise ValueError(_('invalid value "{}", this value is already in "{}"' '').format(val, self.impl_get_display_name())) def calculation_validator(val, _index): validator, validator_params = self.impl_get_validator() if validator is not None: if validator_params != {}: validator_params_ = {} for val_param, values in validator_params.items(): validator_params_[val_param] = values #inject value in calculation if '' in validator_params_: lst = list(validator_params_['']) lst.insert(0, val) validator_params_[''] = tuple(lst) else: validator_params_[''] = (val,) else: validator_params_ = {'': (val,)} # Raise ValueError if not valid value = carry_out_calculation(current_opt, context=context, callback=validator, callback_params=validator_params_, setting_properties=setting_properties, index=_index, is_validator=True) if isinstance(value, Exception): return value def do_validation(_value, _index): if _value is None: error = None else: if display_error: # option validation err = self._validate(_value, context, current_opt) if err: if debug: # pragma: no cover log.debug('do_validation: value: {0}, index: {1}:' ' {2}'.format(_value, _index), exc_info=True) err_msg = '{0}'.format(err) if err_msg: msg = _('"{0}" is an invalid {1} for "{2}", {3}' '').format(_value, self._display_name, self.impl_get_display_name(), err_msg) else: msg = _('"{0}" is an invalid {1} for "{2}"' '').format(_value, self._display_name, self.impl_get_display_name()) raise ValueError(msg) error = None is_warnings_only = getattr(self, '_warnings_only', False) if ((display_error and not is_warnings_only) or (display_warnings and is_warnings_only)): error = calculation_validator(_value, _index) if not error: error = self._second_level_validation(_value, is_warnings_only) if error: if debug: # pragma: no cover log.debug(_('do_validation for {0}: error in value').format( self.impl_getname()), exc_info=True) if is_warnings_only: msg = _('attention, "{0}" could be an invalid {1} for "{2}", {3}' '').format(_value, self._display_name, self.impl_get_display_name(), error) warnings.warn_explicit(ValueWarning(msg, self), ValueWarning, self.__class__.__name__, 0) error = None if error is None: # if context launch consistency validation #if context is not undefined: ret = self._valid_consistency(current_opt, _value, context, _index, display_warnings, display_error, setting_properties) if isinstance(ret, ValueError): error = ret elif ret: return ret if error: err_msg = '{0}'.format(error) if err_msg: msg = _('"{0}" is an invalid {1} for "{2}", {3}' '').format(_value, self._display_name, self.impl_get_display_name(), err_msg) else: msg = _('"{0}" is an invalid {1} for "{2}"' '').format(_value, self._display_name, self.impl_get_display_name()) raise ValueError(msg) if is_multi is None: is_multi = self.impl_is_multi() if not is_multi: return do_validation(value, None) elif force_index is not None: if self.impl_is_submulti(): err = _is_not_unique(value) if err: return err if not isinstance(value, list): raise ValueError(_('invalid value "{0}" for "{1}" which' ' must be a list').format( value, self.impl_get_display_name())) for idx, val in enumerate(value): if isinstance(val, list): # pragma: no cover raise ValueError(_('invalid value "{}" for "{}" ' 'which must not be a list').format(val, self.impl_get_display_name())) err = do_validation(val, force_index) if err: return err else: if multi is not None and self.impl_is_unique() and value in multi: if not self.impl_is_submulti() and len(multi) - 1 >= force_index: lst = list(multi) lst.pop(force_index) else: lst = multi if value in lst: raise ValueError(_('invalid value "{}", this value is already' ' in "{}"').format(value, self.impl_get_display_name())) return do_validation(value, force_index) elif not isinstance(value, list): raise ValueError(_('invalid value "{0}" for "{1}" which ' 'must be a list').format(value, self.impl_getname())) elif self.impl_is_submulti(): for idx, val in enumerate(value): err = _is_not_unique(val) if err: return err if not isinstance(val, list): raise ValueError(_('invalid value "{0}" for "{1}" ' 'which must be a list of list' '').format(val, self.impl_getname())) for slave_val in val: err = do_validation(slave_val, idx) if err: return err else: err = _is_not_unique(value) if err: return err for idx, val in enumerate(value): err = do_validation(val, idx) if err: return err return self._valid_consistency(current_opt, None, context, None, display_warnings, display_error, setting_properties) def impl_is_dynsymlinkoption(self): return False def impl_is_master_slaves(self, type_='both'): """FIXME """ master_slaves = self.impl_get_master_slaves() if master_slaves is not None: if type_ in ('both', 'master') and \ master_slaves.is_master(self): return True if type_ in ('both', 'slave') and \ not master_slaves.is_master(self): return True return False def impl_get_master_slaves(self): masterslave = getattr(self, '_master_slaves', None) if masterslave is None: return masterslave return masterslave() def impl_getdoc(self): "accesses the Option's doc" return self.impl_get_information('doc') def _valid_consistencies(self, other_opts, init=True, func=None): if self._is_subdyn(): dynod = self._subdyn() else: dynod = None if self.impl_is_submulti(): raise ConfigError(_('cannot add consistency with submulti option')) is_multi = self.impl_is_multi() for wopt in other_opts: if isinstance(wopt, weakref.ReferenceType): opt = wopt() else: opt = wopt if opt.impl_is_submulti(): raise ConfigError(_('cannot add consistency with submulti option')) if not isinstance(opt, Option): raise ConfigError(_('consistency must be set with an option, not {}').format(opt)) if opt._is_subdyn(): if dynod is None: raise ConfigError(_('almost one option in consistency is ' 'in a dynoptiondescription but not all')) subod = opt._subdyn() if dynod != subod: raise ConfigError(_('option in consistency must be in same' ' dynoptiondescription')) dynod = subod elif dynod is not None: raise ConfigError(_('almost one option in consistency is in a ' 'dynoptiondescription but not all')) if self is opt: raise ConfigError(_('cannot add consistency with itself')) if is_multi != opt.impl_is_multi(): raise ConfigError(_('every options in consistency must be ' 'multi or none')) if init: # FIXME if func != 'not_equal': opt._has_dependency = True def impl_add_consistency(self, func, *other_opts, **params): """Add consistency means that value will be validate with other_opts option's values. :param func: function's name :type func: `str` :param other_opts: options used to validate value :type other_opts: `list` of `tiramisu.option.Option` :param params: extra params (warnings_only and transitive are allowed) """ if self.impl_is_readonly(): raise AttributeError(_("'{0}' ({1}) cannot add consistency, option is" " read-only").format( self.__class__.__name__, self.impl_getname())) self._valid_consistencies(other_opts, func=func) func = '_cons_{0}'.format(func) if func not in dir(self): raise ConfigError(_('consistency {0} not available for this option').format(func)) options = [weakref.ref(self)] for option in other_opts: options.append(weakref.ref(option)) all_cons_opts = tuple(options) unknown_params = set(params.keys()) - set(['warnings_only', 'transitive']) if unknown_params != set(): raise ValueError(_('unknown parameter {0} in consistency').format(unknown_params)) self._add_consistency(func, all_cons_opts, params) #validate default value when add consistency err = self.impl_validate(self.impl_getdefault()) if err: self._del_consistency() raise err if func != '_cons_not_equal': #consistency could generate warnings or errors self._has_dependency = True for wopt in all_cons_opts: opt = wopt() if func in ALLOWED_CONST_LIST: if getattr(opt, '_unique', undefined) == undefined: opt._unique = True if opt != self: self._add_dependency(opt) opt._add_dependency(self) def _valid_consistency(self, option, value, context, index, display_warnings, display_error, setting_properties): if context is not undefined: descr = context.cfgimpl_get_description() if descr._cache_consistencies is None: return #consistencies is something like [('_cons_not_equal', (opt1, opt2))] if isinstance(option, DynSymLinkOption): consistencies = descr._cache_consistencies.get(option.impl_getopt()) else: consistencies = descr._cache_consistencies.get(option) else: consistencies = option._get_consistencies() if consistencies is not None: for func, all_cons_opts, params in consistencies: warnings_only = params.get('warnings_only', False) if (warnings_only and display_warnings) or (not warnings_only and display_error): transitive = params.get('transitive', True) #all_cons_opts[0] is the option where func is set if isinstance(option, DynSymLinkOption): subpath = '.'.join(option._dyn.split('.')[:-1]) namelen = len(option.impl_getopt().impl_getname()) suffix = option.impl_getname()[namelen:] opts = [] for opt in all_cons_opts: opts.append(DynSymLinkOption(opt, subpath, suffix)) else: opts = all_cons_opts err = opts[0]()._launch_consistency(self, func, option, value, context, index, opts, warnings_only, transitive, setting_properties) if err: return err def _cons_not_equal(self, current_opt, opts, vals, warnings_only): equal = set() is_current = False for idx_inf, val_inf in enumerate(vals): for idx_sup, val_sup in enumerate(vals[idx_inf + 1:]): if val_inf == val_sup is not None: for opt_ in [opts[idx_inf], opts[idx_inf + idx_sup + 1]]: if opt_ == current_opt: is_current = True else: equal.add(opt_) if equal: if debug: # pragma: no cover log.debug(_('_cons_not_equal: {} are not different').format(display_list(list(equal)))) if is_current: if warnings_only: msg = _('should be different from the value of {}') else: msg = _('must be different from the value of {}') else: if warnings_only: msg = _('value for {} should be different') else: msg = _('value for {} must be different') equal_name = [] for opt in equal: equal_name.append(opt.impl_get_display_name()) raise ValueError(msg.format(display_list(list(equal_name)))) def _second_level_validation(self, value, warnings_only): pass def impl_getdefault_multi(self): "accessing the default value for a multi" if self.impl_is_submulti(): default_value = [] else: default_value = None return getattr(self, '_default_multi', default_value) def _validate_callback(self, callback, callback_params): """callback_params: * None * {'': ((option, permissive),), 'ip': ((None,), (option, permissive)) """ if callback is None: return default_multi = getattr(self, '_default_multi', None) is_multi = self.impl_is_multi() default = self.impl_getdefault() if (not is_multi and (default is not None or default_multi is not None)) or \ (is_multi and (default != [] or default_multi is not None)): raise ValueError(_('default value not allowed if option "{0}" ' 'is calculated').format(self.impl_getname())) def impl_getdefault(self): "accessing the default value" is_multi = self.impl_is_multi() default = getattr(self, '_default', undefined) if default is undefined: if is_multi: default = [] else: default = None else: if is_multi: default = list(default) return default def _get_extra(self, key): extra = self._extra if isinstance(extra, tuple): return extra[1][extra[0].index(key)] else: return extra[key] def impl_is_submulti(self): return getattr(self, '_multi', 1) == 2 def impl_allow_empty_list(self): return getattr(self, '_allow_empty_list', undefined) #____________________________________________________________ # consistency def _add_consistency(self, func, all_cons_opts, params): cons = (func, all_cons_opts, params) consistencies = getattr(self, '_consistencies', None) if consistencies is None: self._consistencies = [cons] else: consistencies.append(cons) def _del_consistency(self): self._consistencies.pop(-1) def _get_consistencies(self): return getattr(self, '_consistencies', STATIC_TUPLE) def _has_consistencies(self): return hasattr(self, '_consistencies') class RegexpOption(Option): __slots__ = tuple() def _validate(self, value, context=undefined, current_opt=undefined): err = self._impl_valid_unicode(value) if err: return err match = self._regexp.search(value) if not match: raise ValueError()