tiramisu/tiramisu/storage/dictionary/option.py

572 lines
21 KiB
Python

# -*- coding: utf-8 -*-
""
# Copyright (C) 2014 Team tiramisu (see AUTHORS for all contributors)
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
#
# ____________________________________________________________
from ...i18n import _
from ...setting import undefined
from ...error import ConfigError
static_tuple = tuple()
static_set = frozenset()
#____________________________________________________________
#
# Base
#('_name', '_informations', '_multi', '_multitype', '_warnings_only', '_extra', '_readonly', '_subdyn)
class StorageBase(object):
__slots__ = ('_name',
'_informations',
'_multi',
'_extra',
'_warnings_only',
'_allow_empty_list',
#value
'_default',
'_default_multi',
#calcul
'_subdyn',
'_requires',
'_properties',
'_calc_properties',
'_val_call',
#
'_consistencies',
'_master_slaves',
'_choice_values',
'_choice_values_params',
#other
'_state_master_slaves',
'_state_callback',
'_state_callback_params',
'_state_requires',
'_stated',
'_state_consistencies',
'_state_informations',
'_state_extra',
'_state_readonly',
'__weakref__'
)
def __init__(self, name, multi, warnings_only, doc, extra, calc_properties,
requires, properties, allow_empty_list, opt=undefined):
_setattr = object.__setattr__
_setattr(self, '_name', name)
if doc is not undefined:
_setattr(self, '_informations', {'doc': doc})
if multi != 1:
_setattr(self, '_multi', multi)
if extra is not None:
_setattr(self, '_extra', extra)
if warnings_only is True:
_setattr(self, '_warnings_only', warnings_only)
if calc_properties is not undefined:
_setattr(self, '_calc_properties', calc_properties)
if requires is not undefined:
_setattr(self, '_requires', requires)
if properties is not undefined:
_setattr(self, '_properties', properties)
if opt is not undefined:
_setattr(self, '_opt', opt)
if allow_empty_list is not undefined:
_setattr(self, '_allow_empty_list', allow_empty_list)
def _set_default_values(self, default, default_multi, is_multi):
_setattr = object.__setattr__
if (is_multi and default != []) or \
(not is_multi and default is not None):
if is_multi:
default = tuple(default)
_setattr(self, '_default', default)
if is_multi and default_multi is not None:
try:
self._validate(default_multi)
except ValueError as err: # pragma: optional cover
raise ValueError(_("invalid default_multi value {0} "
"for option {1}: {2}").format(
str(default_multi),
self.impl_getname(), str(err)))
_setattr(self, '_default_multi', default_multi)
# information
def impl_set_information(self, key, value):
"""updates the information's attribute
(which is a dictionary)
:param key: information's key (ex: "help", "doc"
:param value: information's value (ex: "the help string")
"""
if self.impl_is_readonly():
raise AttributeError(_("'{0}' ({1}) object attribute '{2}' is"
" read-only").format(
self.__class__.__name__,
self,
#self.impl_getname(),
key))
self._informations[key] = value
def impl_get_information(self, key, default=undefined):
"""retrieves one information's item
:param key: the item string (ex: "help")
"""
error = False
dico = self._informations
if isinstance(dico, str) or isinstance(dico, unicode):
if key == 'doc':
return dico
if default is not undefined:
return default
error = True
elif isinstance(dico, tuple):
try:
return dico[1][dico[0].index(key)]
except ValueError:
if default is not undefined:
return default
error = True
else:
# dict
if default is not undefined:
return self._informations.get(key, default)
try:
return self._informations[key]
except KeyError: # pragma: optional cover
error = True
if error:
raise ValueError(_("information's item not found: {0}").format(
key))
def _add_consistency(self, func, all_cons_opts, params):
cons = (func, all_cons_opts, params)
try:
self._consistencies.append(cons)
except AttributeError:
self._consistencies = [cons]
def _del_consistency(self):
self._consistencies.pop(-1)
def _get_consistencies(self):
return getattr(self, '_consistencies', static_tuple)
def _set_callback(self, callback, callback_params):
if callback_params is None or callback_params == {}:
val_call = (callback,)
else:
val_call = tuple([callback, callback_params])
try:
self._val_call = (self._val_call[0], val_call)
except AttributeError:
self._val_call = (None, val_call)
def impl_get_callback(self):
default = (None, {})
try:
call = self._val_call[1]
except (AttributeError, IndexError):
ret_call = default
else:
if call is None:
ret_call = default
else:
if len(call) == 1:
ret_call = (call[0], default[1])
else:
ret_call = call
return ret_call
def impl_get_calc_properties(self):
return getattr(self, '_calc_properties', static_set)
def impl_getrequires(self):
return getattr(self, '_requires', static_tuple)
def _set_validator(self, validator, validator_params):
if validator_params is None:
val_call = (validator,)
else:
val_call = (validator, validator_params)
try:
self._val_call = (val_call, self._val_call[1])
except (AttributeError, IndexError):
self._val_call = (val_call, None)
def impl_get_validator(self):
default = (None, {})
try:
val = self._val_call[0]
except (AttributeError, IndexError):
ret_val = default
else:
if val is None:
ret_val = default
else:
if len(val) == 1:
ret_val = (val[0], default[1])
else:
ret_val = val
return ret_val
def _get_id(self):
return id(self)
def _impl_getsubdyn(self):
return self._subdyn
def _impl_getopt(self):
return self._opt
def _set_readonly(self, has_extra):
if not self.impl_is_readonly():
_setattr = object.__setattr__
dico = self._informations
keys = tuple(dico.keys())
if len(keys) == 1:
dico = dico['doc']
else:
dico = tuple([keys, tuple(dico.values())])
_setattr(self, '_informations', dico)
if has_extra:
extra = getattr(self, '_extra', None)
if extra is not None:
_setattr(self, '_extra', tuple([tuple(extra.keys()), tuple(extra.values())]))
def _impl_setsubdyn(self, subdyn):
self._subdyn = subdyn
def _impl_setopt(self, opt):
self._opt = opt
def _impl_convert_zinformations(self, descr, load=False):
if not load:
infos = self._informations
if isinstance(infos, tuple):
self._state_informations = {}
for idx, key in enumerate(infos[0]):
self._state_informations[key] = infos[1][idx]
elif isinstance(infos, str) or isinstance(infos, unicode):
self._state_informations = {'doc': infos}
else:
self._state_informations = infos
self._state_readonly = self.impl_is_readonly()
else:
try:
self._informations = self._state_informations
del(self._state_informations)
except AttributeError:
pass
if self._state_readonly:
self._set_readonly(True)
del(self._state_readonly)
def _impl_convert_extra(self, descr, load=False):
if not load:
try:
extra = self._extra
if isinstance(extra, tuple):
self._state_extra = {}
for idx, key in enumerate(extra[0]):
self._state_extra[key] = extra[1][idx]
except AttributeError:
pass
else:
try:
self._extra = self._state_extra
del(self._state_extra)
except AttributeError:
pass
def _impl_getattributes(self):
slots = set()
for subclass in self.__class__.__mro__:
if subclass is not object:
slots.update(subclass.__slots__)
return slots
def impl_is_readonly(self):
try:
return not isinstance(self._informations, dict)
except AttributeError:
return False
def impl_getname(self):
return self._name
def impl_is_multi(self):
_multi = getattr(self, '_multi', 1)
return _multi != 1
def impl_is_submulti(self):
try:
_multi = self._multi
except IndexError:
_multi = 1
return _multi == 2
def impl_allow_empty_list(self):
try:
return self._allow_empty_list
except AttributeError:
return undefined
def _get_extra(self, key):
extra = self._extra
if isinstance(extra, tuple):
return extra[1][extra[0].index(key)]
else:
return extra[key]
def _is_warnings_only(self):
try:
return self._warnings_only
except AttributeError:
return False
def impl_getdefault(self):
"accessing the default value"
is_multi = self.impl_is_multi()
try:
ret = self._default
if is_multi:
return list(ret)
return ret
except AttributeError:
if is_multi:
return []
return None
def impl_getdefault_multi(self):
"accessing the default value for a multi"
if self.impl_is_multi():
try:
return self._default_multi
except (AttributeError, IndexError):
pass
def commit(self):
pass
class StorageOptionDescription(StorageBase):
__slots__ = ('_children', '_cache_paths', '_cache_consistencies',
'_group_type', '_state_group_type')
def __init__(self, name, multi, warnings_only, doc, extra):
super(StorageOptionDescription, self).__init__(name, multi,
warnings_only, doc,
None, undefined,
undefined, undefined)
def _add_children(self, child_names, children):
_setattr = object.__setattr__
_setattr(self, '_children', (tuple(child_names), tuple(children)))
def impl_already_build_caches(self):
return getattr(self, '_cache_paths', None) is not None
def impl_get_opt_by_path(self, path):
try:
return self._cache_paths[0][self._cache_paths[1].index(path)]
except ValueError: # pragma: optional cover
raise AttributeError(_('no option for path {0}').format(path))
def impl_get_path_by_opt(self, opt):
if getattr(self, '_cache_paths', None) is None:
raise ConfigError(_('use impl_get_path_by_opt only with root OptionDescription'))
try:
return self._cache_paths[1][self._cache_paths[0].index(opt)]
except ValueError: # pragma: optional cover
raise AttributeError(_('no option {0} found').format(opt))
def impl_get_group_type(self): # pragma: optional cover
return self._group_type
def impl_build_cache_option(self, _currpath=None, cache_path=None,
cache_option=None):
if _currpath is None and getattr(self, '_cache_paths', None) is not None:
# cache already set
return
if _currpath is None:
save = True
_currpath = []
else:
save = False
if cache_path is None:
cache_path = []
cache_option = []
for option in self._impl_getchildren(dyn=False):
attr = option.impl_getname()
path = str('.'.join(_currpath + [attr]))
cache_option.append(option)
cache_path.append(path)
if option.impl_is_optiondescription():
_currpath.append(attr)
option.impl_build_cache_option(_currpath, cache_path,
cache_option)
_currpath.pop()
if save:
_setattr = object.__setattr__
_setattr(self, '_cache_paths', (tuple(cache_option), tuple(cache_path)))
def impl_get_options_paths(self, bytype, byname, _subpath, only_first, context):
find_results = []
def _rebuild_dynpath(path, suffix, dynopt):
found = False
spath = path.split('.')
for length in xrange(1, len(spath)):
subpath = '.'.join(spath[0:length])
subopt = self.impl_get_opt_by_path(subpath)
if dynopt == subopt:
found = True
break
if not found:
raise ConfigError(_('cannot find dynpath'))
subpath = subpath + suffix
for slength in xrange(length, len(spath)):
subpath = subpath + '.' + spath[slength] + suffix
return subpath
def _filter_by_name(path, option):
name = option.impl_getname()
if option._is_subdyn():
if byname.startswith(name):
found = False
for suffix in option._subdyn._impl_get_suffixes(
context):
if byname == name + suffix:
found = True
path = _rebuild_dynpath(path, suffix,
option._subdyn)
option = option._impl_to_dyn(
name + suffix, path)
break
if not found:
return False
else:
if not byname == name:
return False
find_results.append((path, option))
return True
def _filter_by_type(path, option):
if isinstance(option, bytype):
#if byname is not None, check option byname in _filter_by_name
#not here
if byname is None:
if option._is_subdyn():
name = option.impl_getname()
for suffix in option._subdyn._impl_get_suffixes(
context):
spath = _rebuild_dynpath(path, suffix,
option._subdyn)
find_results.append((spath, option._impl_to_dyn(
name + suffix, spath)))
else:
find_results.append((path, option))
return True
return False
def _filter(path, option):
if bytype is not None:
retval = _filter_by_type(path, option)
if byname is None:
return retval
if byname is not None:
return _filter_by_name(path, option)
opts, paths = self._cache_paths
for index in range(0, len(paths)):
option = opts[index]
if option.impl_is_optiondescription():
continue
path = paths[index]
if _subpath is not None and not path.startswith(_subpath + '.'):
continue
if bytype == byname is None:
if option._is_subdyn():
name = option.impl_getname()
for suffix in option._subdyn._impl_get_suffixes(
context):
spath = _rebuild_dynpath(path, suffix,
option._subdyn)
find_results.append((spath, option._impl_to_dyn(
name + suffix, spath)))
else:
find_results.append((path, option))
else:
if _filter(path, option) is False:
continue
if only_first:
return find_results[0]
return find_results
def _impl_st_getchildren(self, context, only_dyn=False):
for child in self._children[1]:
if only_dyn is False or child.impl_is_dynoptiondescription():
yield(child)
def _getattr(self, name, suffix=undefined, context=undefined, dyn=True):
error = False
if suffix is not undefined:
try:
if undefined in [suffix, context]: # pragma: optional cover
raise ConfigError(_("suffix and context needed if "
"it's a dyn option"))
if name.endswith(suffix):
oname = name[:-len(suffix)]
child = self._children[1][self._children[0].index(oname)]
return self._impl_get_dynchild(child, suffix)
else:
error = True
except ValueError: # pragma: optional cover
error = True
else:
try: # pragma: optional cover
if name == '_readonly':
raise AttributeError(_("{0} instance has no attribute "
"'_readonly'").format(
self.__class__.__name__))
child = self._children[1][self._children[0].index(name)]
if dyn and child.impl_is_dynoptiondescription():
error = True
else:
return child
except ValueError:
child = self._impl_search_dynchild(name, context=context)
if child != []:
return child
error = True
if error:
raise AttributeError(_('unknown Option {0} '
'in OptionDescription {1}'
'').format(name, self.impl_getname()))
def _get_force_store_value(self):
#FIXME faire des tests (notamment pas ajouter à un config)
#FIXME devrait faire un cache !
opts, paths = self._cache_paths
for index in range(0, len(paths)):
opt = opts[index]
path = paths[index]
if 'force_store_value' in opt._properties:
yield (opt, path)