tiramisu/tiramisu/option/baseoption.py

597 lines
24 KiB
Python

# -*- coding: utf-8 -*-
# Copyright (C) 2014-2018 Team tiramisu (see AUTHORS for all contributors)
#
# This program is free software: you can redistribute it and/or modify it
# under the terms of the GNU Lesser General Public License as published by the
# Free Software Foundation, either version 3 of the License, or (at your
# option) any later version.
#
# This program is distributed in the hope that it will be useful, but WITHOUT
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
# FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
# details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
# The original `Config` design model is unproudly borrowed from
# the rough pypy's guys: http://codespeak.net/svn/pypy/dist/pypy/config/
# the whole pypy projet is under MIT licence
# ____________________________________________________________
import re
from types import FunctionType
from typing import FrozenSet, Callable, Tuple, Set, Optional, Union, Any, List
import weakref
from inspect import signature
from itertools import chain
from ..i18n import _
from ..setting import undefined, Settings
from ..value import Values
from ..error import ConfigError, display_list
from ..function import Params, ParamContext, ParamOption, ParamIndex
STATIC_TUPLE = frozenset()
submulti = 2
NAME_REGEXP = re.compile(r'^[a-zA-Z][a-zA-Z\d_-]*$')
def valid_name(name):
if not isinstance(name, str):
return False
return True
# return re.match(NAME_REGEXP, name) is not None
#____________________________________________________________
#
class Base:
"""Base use by all *Option* classes (Option, OptionDescription, SymLinkOption, ...)
"""
__slots__ = ('_name',
'_path',
'_informations',
#calcul
'_subdyn',
'_requires',
'_properties',
'_calc_properties',
#
'_consistencies',
#other
'_has_dependency',
'_dependencies',
'_has_calc_context',
'__weakref__'
)
def __init__(self,
name: str,
doc: str,
requires=None,
properties=None,
is_multi: bool=False) -> None:
if not valid_name(name):
raise ValueError(_('"{0}" is an invalid name for an option').format(name))
if requires is not None:
calc_properties, requires = validate_requires_arg(self,
is_multi,
requires,
name)
else:
calc_properties = frozenset()
requires = undefined
if properties is None:
properties = frozenset()
if isinstance(properties, tuple):
properties = frozenset(properties)
if is_multi and 'empty' not in properties:
# if option is a multi, it cannot be "empty" (None not allowed in the list)
# "empty" is removed for slave's option
properties = properties | {'empty'}
if not isinstance(properties, frozenset):
raise TypeError(_('invalid properties type {0} for {1},'
' must be a frozenset').format(type(properties),
name))
self.validate_properties(name,
calc_properties,
properties)
_setattr = object.__setattr__
_setattr(self, '_name', name)
_setattr(self, '_informations', {'doc': doc})
if calc_properties is not undefined:
_setattr(self, '_calc_properties', calc_properties)
if requires is not undefined:
_setattr(self, '_requires', requires)
if properties:
_setattr(self, '_properties', properties)
def validate_properties(self,
name: str,
calc_properties: FrozenSet[str],
properties: FrozenSet[str]) -> None:
set_forbidden_properties = calc_properties & properties
if set_forbidden_properties != frozenset():
raise ValueError(_('conflict: properties already set in requirement {0} for {1}'
'').format(display_list(set_forbidden_properties),
name))
def _get_function_args(self,
function: Callable) -> Tuple[Set[str], Set[str], bool, bool]:
args = set()
kwargs = set()
positional = False
keyword = False
for param in signature(function).parameters.values():
if param.kind == param.VAR_POSITIONAL:
positional = True
elif param.kind == param.VAR_KEYWORD:
keyword = True
elif param.default is param.empty:
args.add(param.name)
else:
kwargs.add(param.name)
return args, kwargs, positional, keyword
def _get_parameters_args(self,
calculator_params: Optional[Params],
add_value: bool) -> Tuple[Set[str], Set[str]]:
args = set()
kwargs = set()
# add value as first argument
if add_value:
args.add('value')
if self.impl_is_dynoptiondescription():
kwargs.add('suffix')
if calculator_params:
for idx in range(len(calculator_params.args)):
# construct an appropriate name
args.add('param{}'.format(idx))
for param in calculator_params.kwargs:
kwargs.add(param)
return args, kwargs
def _build_calculator_params(self,
calculator: Callable,
calculator_params: Optional[Params],
type_: str,
add_value: bool=False) -> Union[None, Params]:
"""
:add_value: add value as first argument for validator
"""
assert isinstance(calculator, FunctionType), _('{0} must be a function').format(type_)
if calculator_params is not None:
assert isinstance(calculator_params, Params), _('{0}_params must be a params'
'').format(type_)
for param in chain(calculator_params.args, calculator_params.kwargs.values()):
if isinstance(param, ParamContext):
self._has_calc_context = True
elif isinstance(param, ParamOption):
param.option._add_dependency(self)
if type_ == 'validator':
self._has_dependency = True
is_multi = self.impl_is_dynoptiondescription() or self.impl_is_multi()
func_args, func_kwargs, func_positional, func_keyword = self._get_function_args(calculator)
calculator_args, calculator_kwargs = self._get_parameters_args(calculator_params, add_value)
# remove knowned kwargs
common_kwargs = func_kwargs & calculator_kwargs
func_kwargs -= common_kwargs
calculator_kwargs -= common_kwargs
# remove knowned calculator's kwargs in func's args
common = func_args & calculator_kwargs
func_args -= common
calculator_kwargs -= common
# remove unknown calculator's args in func's args
for idx in range(min(len(calculator_args), len(func_args))):
func_args.pop()
calculator_args.pop()
# remove unknown calculator's args in func's kwargs
if is_multi:
func_kwargs_left = func_kwargs - {'index', 'self'}
else:
func_kwargs_left = func_kwargs
func_kwargs_pop = set()
for idx in range(min(len(calculator_args), len(func_kwargs_left))):
func_kwargs_pop.add(func_kwargs_left.pop())
calculator_args.pop()
func_kwargs -= func_kwargs_pop
# func_positional or keyword is True, so assume all args or kwargs are satisfy
if func_positional:
calculator_args = set()
if func_keyword:
calculator_kwargs = set()
if calculator_args or calculator_kwargs:
# there is more args/kwargs than expected!
raise ConfigError(_('cannot find those arguments "{}" in function "{}" for "{}"'
'').format(display_list(list(calculator_args | calculator_kwargs)),
calculator.__name__,
self.impl_get_display_name()))
has_index = False
if is_multi and func_args and not self.impl_is_dynoptiondescription():
if calculator_params is None:
calculator_params = Params()
params = list(calculator_params.args)
if add_value:
# only for validator
params.append(ParamOption(self))
func_args.pop()
if func_args:
has_index = True
params.append(ParamIndex())
func_args.pop()
calculator_params.args = tuple(params)
if func_args:
raise ConfigError(_('missing those arguments "{}" in function "{}" for "{}"'
'').format(display_list(list(func_args)),
calculator.__name__,
self.impl_get_display_name()))
if not self.impl_is_dynoptiondescription() and is_multi and \
not has_index and 'index' in func_kwargs:
calculator_params.kwargs['index'] = ParamIndex()
return calculator_params
def impl_has_dependency(self,
self_is_dep: bool=True) -> bool:
if self_is_dep is True:
return getattr(self, '_has_dependency', False)
return hasattr(self, '_dependencies')
def _get_dependencies(self,
context_od) -> Set[str]:
ret = set(getattr(self, '_dependencies', STATIC_TUPLE))
if context_od and hasattr(context_od, '_dependencies'):
# if context is set in options, add those options
return set(context_od._dependencies) | ret
return ret
def _add_dependency(self,
option) -> None:
options = self._get_dependencies(None)
options.add(weakref.ref(option))
self._dependencies = tuple(options)
def _impl_set_callback(self,
callback: Callable,
callback_params: Optional[Params]=None) -> None:
if callback is None and callback_params is not None:
raise ValueError(_("params defined for a callback function but "
"no callback defined"
' yet for option "{0}"').format(
self.impl_getname()))
self._validate_calculator(callback,
callback_params)
if callback is not None:
callback_params = self._build_calculator_params(callback,
callback_params,
'callback')
# first part is validator
val = getattr(self, '_val_call', (None,))[0]
if not callback_params:
val_call = (callback,)
else:
val_call = (callback, callback_params)
self._val_call = (val, val_call)
def impl_is_optiondescription(self) -> bool:
return False
def impl_is_dynoptiondescription(self) -> bool:
return False
def impl_getname(self) -> str:
return self._name
def _set_readonly(self) -> None:
if not self.impl_is_readonly():
_setattr = object.__setattr__
dico = self._informations
keys = tuple(dico.keys())
if len(keys) == 1:
dico = dico['doc']
else:
dico = tuple([keys, tuple(dico.values())])
_setattr(self, '_informations', dico)
extra = getattr(self, '_extra', None)
if extra is not None:
_setattr(self, '_extra', tuple([tuple(extra.keys()), tuple(extra.values())]))
def impl_is_readonly(self) -> str:
# _path is None when initialise SymlinkOption
return hasattr(self, '_path') and self._path is not None
def impl_getproperties(self) -> FrozenSet[str]:
return getattr(self, '_properties', frozenset())
def _setsubdyn(self,
subdyn) -> None:
self._subdyn = weakref.ref(subdyn)
def issubdyn(self) -> bool:
return getattr(self, '_subdyn', None) is not None
def getsubdyn(self):
return self._subdyn()
def impl_getrequires(self):
return getattr(self, '_requires', STATIC_TUPLE)
def impl_get_callback(self):
call = getattr(self, '_val_call', (None, None))[1]
if call is None:
ret_call = (None, None)
elif len(call) == 1:
ret_call = (call[0], None)
else:
ret_call = call
return ret_call
# ____________________________________________________________
# information
def impl_get_information(self,
key: str,
default: Any=undefined) -> Any:
"""retrieves one information's item
:param key: the item string (ex: "help")
"""
dico = self._informations
if isinstance(dico, tuple):
if key in dico[0]:
return dico[1][dico[0].index(key)]
elif isinstance(dico, str):
if key == 'doc':
return dico
elif isinstance(dico, dict):
if key in dico:
return dico[key]
if default is not undefined:
return default
raise ValueError(_("information's item not found: {0}").format(
key))
def impl_set_information(self,
key: str,
value: Any) -> None:
"""updates the information's attribute
(which is a dictionary)
:param key: information's key (ex: "help", "doc"
:param value: information's value (ex: "the help string")
"""
if self.impl_is_readonly():
raise AttributeError(_("'{0}' ({1}) object attribute '{2}' is"
" read-only").format(self.__class__.__name__,
self,
#self.impl_getname(),
key))
self._informations[key] = value
class BaseOption(Base):
"""This abstract base class stands for attribute access
in options that have to be set only once, it is of course done in the
__setattr__ method
"""
__slots__ = tuple()
def __getstate__(self):
raise NotImplementedError()
def __setattr__(self,
name: str,
value: Any) -> Any:
"""set once and only once some attributes in the option,
like `_name`. `_name` cannot be changed once the option is
pushed in the :class:`tiramisu.option.OptionDescription`.
if the attribute `_readonly` is set to `True`, the option is
"frozen" (which has nothing to do with the high level "freeze"
propertie or "read_only" property)
"""
# never change _name in an option or attribute when object is readonly
if self.impl_is_readonly():
raise AttributeError(_('"{}" ({}) object attribute "{}" is'
' read-only').format(self.__class__.__name__,
self.impl_get_display_name(),
name))
super(BaseOption, self).__setattr__(name, value)
def impl_getpath(self) -> str:
return self._path
def impl_has_callback(self) -> bool:
"to know if a callback has been defined or not"
return self.impl_get_callback()[0] is not None
def impl_get_display_name(self,
dyn_name: Base=None) -> str:
name = self.impl_get_information('doc')
if name is None or name == '':
if dyn_name is not None:
name = dyn_name
else:
name = self.impl_getname()
return name
def reset_cache(self,
path: str,
values: Values,
settings: Settings,
resetted_opts: List[Base]) -> None:
settings._p_.delcache(path)
settings._pp_.delcache(path)
if not self.impl_is_optiondescription():
values._p_.delcache(path)
def impl_is_symlinkoption(self) -> bool:
return False
def validate_requires_arg(new_option: BaseOption,
multi: bool,
requires: List[dict],
name: str) -> Tuple[FrozenSet, Tuple]:
"""check malformed requirements
and tranform dict to internal tuple
:param requires: have a look at the
:meth:`tiramisu.setting.Settings.apply_requires` method to
know more about
the description of the requires dictionary
"""
def get_option(require):
option = require['option']
if option == 'self':
option = new_option
if not isinstance(option, BaseOption):
raise ValueError(_('malformed requirements '
'must be an option in option {0}').format(name))
if not multi and option.impl_is_multi():
raise ValueError(_('malformed requirements '
'multi option must not set '
'as requires of non multi option {0}').format(name))
option._add_dependency(new_option)
return option
def _set_expected(action,
inverse,
transitive,
same_action,
option,
expected,
operator):
if inverse not in ret_requires[action]:
ret_requires[action][inverse] = ([(option, [expected])], action, inverse, transitive, same_action, operator)
else:
for exp in ret_requires[action][inverse][0]:
if exp[0] == option:
exp[1].append(expected)
break
else:
ret_requires[action][inverse][0].append((option, [expected]))
def set_expected(require,
ret_requires):
expected = require['expected']
inverse = get_inverse(require)
transitive = get_transitive(require)
same_action = get_sameaction(require)
operator = get_operator(require)
if isinstance(expected, list):
for exp in expected:
if set(exp.keys()) != {'option', 'value'}:
raise ValueError(_('malformed requirements expected must have '
'option and value for option {0}').format(name))
option = get_option(exp)
if option is not None:
try:
option._validate(exp['value'], undefined)
except ValueError as err:
raise ValueError(_('malformed requirements expected value '
'must be valid for option {0}'
': {1}').format(name, err))
_set_expected(action,
inverse,
transitive,
same_action,
option,
exp['value'],
operator)
else:
option = get_option(require)
if expected is not None:
try:
option._validate(expected, undefined)
except ValueError as err:
raise ValueError(_('malformed requirements expected value '
'must be valid for option {0}'
': {1}').format(name, err))
_set_expected(action,
inverse,
transitive,
same_action,
option,
expected,
operator)
def get_action(require):
action = require['action']
if action == 'force_store_value':
raise ValueError(_("malformed requirements for option: {0}"
" action cannot be force_store_value"
).format(name))
return action
def get_inverse(require):
inverse = require.get('inverse', False)
if inverse not in [True, False]:
raise ValueError(_('malformed requirements for option: {0}'
' inverse must be boolean'))
return inverse
def get_transitive(require):
transitive = require.get('transitive', True)
if transitive not in [True, False]:
raise ValueError(_('malformed requirements for option: {0}'
' transitive must be boolean'))
return transitive
def get_sameaction(require):
same_action = require.get('same_action', True)
if same_action not in [True, False]:
raise ValueError(_('malformed requirements for option: {0}'
' same_action must be boolean'))
return same_action
def get_operator(require):
operator = require.get('operator', 'or')
if operator not in ['and', 'or']:
raise ValueError(_('malformed requirements for option: "{0}"'
' operator must be "or" or "and"').format(operator))
return operator
ret_requires = {}
config_action = set()
# start parsing all requires given by user (has dict)
# transforme it to a tuple
for require in requires:
if not isinstance(require, dict):
raise ValueError(_("malformed requirements type for option:"
" {0}, must be a dict").format(name))
valid_keys = ('option', 'expected', 'action', 'inverse', 'transitive',
'same_action', 'operator')
unknown_keys = frozenset(require.keys()) - frozenset(valid_keys)
if unknown_keys != frozenset():
raise ValueError(_('malformed requirements for option: {0}'
' unknown keys {1}, must only '
'{2}').format(name,
unknown_keys,
valid_keys))
# prepare all attributes
if not ('expected' in require and isinstance(require['expected'], list)) and \
not ('option' in require and 'expected' in require) or \
'action' not in require:
raise ValueError(_("malformed requirements for option: {0}"
" require must have option, expected and"
" action keys").format(name))
action = get_action(require)
config_action.add(action)
if action not in ret_requires:
ret_requires[action] = {}
set_expected(require, ret_requires)
# transform dict to tuple
ret = []
for requires in ret_requires.values():
ret_action = []
for require in requires.values():
ret_action.append((tuple(require[0]), require[1],
require[2], require[3], require[4], require[5]))
ret.append(tuple(ret_action))
return frozenset(config_action), tuple(ret)