tiramisu/tiramisu/value.py

781 lines
32 KiB
Python

# -*- coding: utf-8 -*-
"takes care of the option's values and multi values"
# Copyright (C) 2013-2018 Team tiramisu (see AUTHORS for all contributors)
#
# This program is free software: you can redistribute it and/or modify it
# under the terms of the GNU Lesser General Public License as published by the
# Free Software Foundation, either version 3 of the License, or (at your
# option) any later version.
#
# This program is distributed in the hope that it will be useful, but WITHOUT
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
# FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
# details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
# ____________________________________________________________
from time import time
import weakref
from .error import ConfigError, PropertiesOptionError
from .setting import owners, expires_time, undefined, forbidden_owners
from .autolib import carry_out_calculation
from .i18n import _
class Values(object):
"""The `Config`'s root is indeed in charge of the `Option()`'s values,
but the values are physicaly located here, in `Values`, wich is also
responsible of a caching utility.
"""
__slots__ = ('context',
'_p_',
'__weakref__')
def __init__(self,
context,
storage):
"""
Initializes the values's dict.
:param context: the context is the home config's values
:param storage: where values or owners are stored
"""
self.context = weakref.ref(context)
# store the storage
self._p_ = storage
#______________________________________________________________________
# get context
def _getcontext(self):
"""context is a weakref so context could be None, we need to test it
context is None only if all reference to `Config` object is deleted
(for example we delete a `Config` and we manipulate a reference to
old `SubConfig`, `Values`, `Multi` or `Settings`)
"""
context = self.context()
if context is None:
raise ConfigError(_('the context does not exist anymore'))
return context
#______________________________________________________________________
# get value
def get_cached_value(self,
path,
index,
config_bag):
"""get value directly in cache if set
otherwise calculated value and set it in cache
:param opt: the `Option` that we want to get value
:param path: the path of the `Option`
:param validate: the value must be valid
:param force_permissive: force permissive when check properties
:param setting_properties: global properties
:param self_properties: properties for this `Option`
:param index: index for a slave `Option`
:param display_warnings: display warnings or not
:returns: value
"""
ntime = None
# try to retrive value in cache
setting_properties = config_bag.setting_properties
is_cached = False
if setting_properties and 'cache' in setting_properties and \
self._p_.hascache(path,
index):
if 'expire' in setting_properties:
ntime = int(time())
is_cached, value = self._p_.getcache(path,
ntime,
index)
if not is_cached:
# no cached value so get value
value = self.getvalue(path,
index,
config_bag)
#FIXME suboptimal ...
# validate value
if config_bag.validate:
context = self._getcontext()
opt = config_bag.option
opt.impl_validate(value,
context=context,
force_index=index,
check_error=True,
config_bag=config_bag)
if config_bag.display_warnings:
opt.impl_validate(value,
context=context,
force_index=index,
check_error=False,
config_bag=config_bag)
# store value in cache
if not is_cached and \
setting_properties and 'cache' in setting_properties:
if 'expire' in setting_properties:
if ntime is None:
ntime = int(time())
ntime = ntime + expires_time
self._p_.setcache(path, value, ntime, index)
# and return it
return value
def getvalue(self,
path,
index,
config_bag):
"""actually retrieves the value
:param path: the path of the `Option`
:param index: index for a slave `Option`
:returns: value
"""
# get owner and value from store
# index allowed only for slave
opt = config_bag.option
is_slave = opt.impl_is_master_slaves('slave')
if index is None or not is_slave:
_index = None
else:
_index = index
owner, value = self._p_.getowner(path,
owners.default,
index=_index,
with_value=True)
if owner != owners.default:
# if a value is store in storage, check if not frozen + force_default_on_freeze
# if frozen + force_default_on_freeze => force default value
self_properties = config_bag.properties
if self_properties is None:
settings = self._getcontext().cfgimpl_get_settings()
self_properties = settings.getproperties(path,
index,
config_bag)
config_bag.properties = self_properties
if not ('frozen' in self_properties and \
'force_default_on_freeze' in self_properties):
if index is not None and not is_slave:
if len(value) > index:
return value[index]
#value is smaller than expected
#so return default value
else:
return value
return self._getdefaultvalue(path,
index,
config_bag)
def getdefaultvalue(self,
path,
index,
config_bag):
"""get default value:
- get meta config value or
- get calculated value or
- get default value
:param opt: the `option.Option()` object
:param path: path for `option.Option()` object
:type path: str
:param index: index of a multi/submulti
:type index: int
:returns: default value
"""
return self._getdefaultvalue(path,
index,
config_bag)
def _getdefaultvalue(self,
path,
index,
config_bag):
context = self._getcontext()
opt = config_bag.option
def _reset_cache():
# calculated value could be a new value, so reset cache
context.cfgimpl_reset_cache(opt=opt,
path=path)
if opt.impl_is_master_slaves('slave'):
index_ = index
else:
index_ = None
if self._is_meta(path,
index_,
config_bag):
meta = context.cfgimpl_get_meta()
# retrieved value from meta config
try:
value = meta.getattr(path,
index,
config_bag)
except PropertiesOptionError:
# if properties error, return an other default value
# unexpected error, should not happened
pass
else:
return value
if opt.impl_has_callback():
# if value has callback, calculate value
callback, callback_params = opt.impl_get_callback()
value = carry_out_calculation(opt,
context=context,
callback=callback,
callback_params=callback_params,
index=index,
config_bag=config_bag)
if isinstance(value, list) and index is not None:
# if value is a list and index is set
if opt.impl_is_submulti() and (value == [] or not isinstance(value[0], list)):
# return value only if it's a submulti and not a list of list
_reset_cache()
return value
if len(value) > index:
# return the value for specified index if found
_reset_cache()
return value[index]
# there is no calculate value for this index,
# so return an other default value
elif isinstance(value, list):
# value is a list, but no index specified
_reset_cache()
if opt.impl_is_submulti() and (value != [] and not isinstance(value[0], list)):
# if submulti, return a list of value
return [value]
# otherwise just return the value
return value
elif index is not None:
# if not list but with index
_reset_cache()
if opt.impl_is_submulti():
# if submulti, return a list of value
return [value]
# otherwise just return the value
return value
else:
_reset_cache()
# not a list or index is None
if opt.impl_is_submulti():
# return a list of list for a submulti
return [[value]]
elif opt.impl_is_multi():
# return a list for a multi
return [value]
# not a list, return value
return value
# now try to get default value:
# - if opt is a submulti, return a list a list
# - if opt is a multi, return a list
# - default value
value = opt.impl_getdefault()
if opt.impl_is_multi() and index is not None:
# if index, must return good value for this index
if len(value) > index:
value = value[index]
else:
# no value for this index, retrieve default multi value
# default_multi is already a list for submulti
value = opt.impl_getdefault_multi()
return value
def isempty(self,
opt,
value,
force_allow_empty_list=False,
index=None):
"convenience method to know if an option is empty"
if value is undefined:
return False
else:
empty = opt._empty
if index in [None, undefined] and opt.impl_is_multi():
if force_allow_empty_list:
allow_empty_list = True
else:
allow_empty_list = opt.impl_allow_empty_list()
if allow_empty_list is undefined:
allow_empty_list = opt.impl_is_master_slaves('slave')
isempty = value is None or (not allow_empty_list and value == []) or \
None in value or empty in value
else:
isempty = value is None or value == empty
return isempty
def get_modified_values(self):
return self._p_.get_modified_values()
#______________________________________________________________________
# set value
def setvalue(self,
path,
index,
value,
config_bag,
_commit):
context = self._getcontext()
owner = context.cfgimpl_get_settings().getowner()
if 'validator' in config_bag.setting_properties and config_bag.validate:
if index is not None or config_bag.option._has_consistencies(context):
# set value to a fake config when option has dependency
# validation will be complet in this case (consistency, ...)
tested_context = context._gen_fake_values()
sconfig_bag = config_bag.copy()
sconfig_bag.validate = False
tested_context.cfgimpl_get_values().setvalue(path,
index,
value,
sconfig_bag,
True)
sconfig_bag.validate = True
tested_context.getattr(path,
index,
sconfig_bag)
else:
self.setvalue_validation(path,
index,
value,
config_bag)
self._setvalue(path,
index,
value,
owner,
config_bag,
commit=_commit)
def setvalue_validation(self,
path,
index,
value,
config_bag):
context = self._getcontext()
settings = context.cfgimpl_get_settings()
# First validate properties with this value
self_properties = config_bag.self_properties
if self_properties is None:
self_properties = settings.getproperties(path,
index,
config_bag)
config_bag.properties = self_properties
opt = config_bag.option
if settings.validate_frozen(config_bag):
datas = {'path': path,
'config_bag': config_bag,
'index': index,
'debug': True}
raise PropertiesOptionError(None,
['frozen'],
settings,
datas,
'option')
settings.validate_mandatory(path,
index,
value,
config_bag)
# Value must be valid for option
opt.impl_validate(value,
config_bag,
context,
check_error=True,
force_index=index)
# No error found so emit warnings
opt.impl_validate(value,
config_bag,
context,
check_error=False,
force_index=index)
def _setvalue(self,
path,
index,
value,
owner,
config_bag,
commit=True):
self._getcontext().cfgimpl_reset_cache(opt=config_bag.option,
path=path)
if isinstance(value, list):
# copy
value = list(value)
self._p_.setvalue(path,
value,
owner,
index,
commit)
def _is_meta(self,
path,
index,
config_bag,
force_owner_is_default=False):
if not force_owner_is_default and self._p_.hasvalue(path,
index=index):
# has already a value, so not meta
return False
context = self._getcontext()
meta = context.cfgimpl_get_meta()
if meta is None:
return False
opt = config_bag.option
if opt.impl_is_master_slaves('slave'):
master = opt.impl_get_master_slaves().getmaster()
masterp = master.impl_getpath(context)
# slave could be a "meta" only if master hasn't value
if self._p_.hasvalue(masterp,
index=None):
return False
return not meta.cfgimpl_get_values().is_default_owner(path,
index,
config_bag)
#______________________________________________________________________
# owner
def is_default_owner(self,
path,
index,
config_bag,
validate_meta=undefined):
return self._getowner(path,
index,
config_bag,
validate_meta=validate_meta,
only_default=True) == owners.default
def getowner(self,
path,
index,
config_bag):
"""
retrieves the option's owner
:param opt: the `option.Option` object
:param force_permissive: behaves as if the permissive property
was present
:returns: a `setting.owners.Owner` object
"""
return self._getowner(path,
index,
config_bag)
def _getowner(self,
path,
index,
config_bag,
validate_meta=True,
only_default=False):
"""get owner of an option
"""
context = self._getcontext()
opt = config_bag.option
if opt.impl_is_symlinkoption():
config_bag.ori_option = opt
opt = opt.impl_getopt()
config_bag.option = opt
path = opt.impl_getpath(context)
self_properties = config_bag.properties
if self_properties is None:
self_properties = context.cfgimpl_get_settings().getproperties(path,
index,
config_bag)
config_bag.properties = self_properties
if 'frozen' in self_properties and 'force_default_on_freeze' in self_properties:
return owners.default
if only_default:
if self._p_.hasvalue(path,
index):
owner = undefined
else:
owner = owners.default
else:
owner = self._p_.getowner(path,
owners.default,
index=index)
if owner is owners.default and validate_meta is not False:
meta = context.cfgimpl_get_meta()
if meta is not None and self._is_meta(path,
index,
config_bag):
owner = meta.cfgimpl_get_values()._getowner(path,
index,
config_bag,
only_default=only_default)
return owner
def setowner(self,
path,
index,
owner,
config_bag):
"""
sets a owner to an option
:param opt: the `option.Option` object
:param owner: a valid owner, that is a `setting.owners.Owner` object
"""
opt = config_bag.option
if opt.impl_is_symlinkoption():
raise ConfigError(_("can't set owner for the SymLinkOption \"{}\""
"").format(opt.impl_get_display_name()))
if not isinstance(owner, owners.Owner):
raise ConfigError(_("invalid owner {0}").format(str(owner)))
if owner in forbidden_owners:
raise ConfigError(_('set owner "{0}" is forbidden').format(str(owner)))
if not self._p_.hasvalue(path):
raise ConfigError(_('no value for {0} cannot change owner to {1}'
'').format(path, owner))
self.setowner_validation(path,
index,
config_bag)
self._p_.setowner(path, owner, index=index)
#______________________________________________________________________
# reset
def reset(self,
path,
config_bag,
_commit=True):
context = self._getcontext()
setting = context.cfgimpl_get_settings()
hasvalue = self._p_.hasvalue(path)
if config_bag.validate and hasvalue and 'validator' in config_bag.setting_properties:
fake_context = context._gen_fake_values()
fake_value = fake_context.cfgimpl_get_values()
sconfig_bag = config_bag.copy()
sconfig_bag.validate = False
fake_value.reset(path,
sconfig_bag)
value = fake_value._getdefaultvalue(path,
None,
config_bag)
fake_value.setvalue_validation(path,
None,
value,
config_bag)
opt = config_bag.option
if opt.impl_is_master_slaves('master'):
opt.impl_get_master_slaves().reset(self,
config_bag,
_commit=_commit)
if hasvalue:
if 'force_store_value' in setting.getproperties(path,
None,
config_bag):
value = self._getdefaultvalue(path,
None,
config_bag)
self._setvalue(path,
None,
value,
owners.forced,
config_bag,
commit=_commit)
else:
self._p_.resetvalue(path,
_commit)
context.cfgimpl_reset_cache(opt=opt,
path=path)
def reset_slave(self,
path,
index,
config_bag):
context = self._getcontext()
if config_bag.validate and 'validator' in config_bag.setting_properties:
fake_context = context._gen_fake_values()
fake_value = fake_context.cfgimpl_get_values()
sconfig_bag = config_bag.copy()
sconfig_bag.validate = False
fake_value.reset_slave(path,
index,
sconfig_bag)
value = fake_value._getdefaultvalue(path,
index,
config_bag)
fake_value.setvalue_validation(path,
index,
value,
config_bag)
self._p_.resetvalue_index(path, index)
def reset_master(self,
subconfig,
path,
index,
config_bag):
current_value = self.get_cached_value(path,
None,
config_bag)
current_value.pop(index)
self.setvalue(path,
None,
current_value,
config_bag,
_commit=True)
subconfig.cfgimpl_get_description().pop(self,
index,
config_bag)
def setowner_validation(self,
path,
index,
config_bag):
context = self._getcontext()
settings = context.cfgimpl_get_settings()
# First validate properties with this value
self_properties = config_bag.properties
if self_properties is None:
self_properties = settings.getproperties(path,
None,
config_bag)
config_bag.properties = self_properties
if settings.validate_frozen(config_bag):
datas = {'path': path,
'config_bag': config_bag,
'index': index,
'debug': True}
raise PropertiesOptionError(None,
['frozen'],
settings,
datas,
'option')
#______________________________________________________________________
# information
def set_information(self, key, value):
"""updates the information's attribute
:param key: information's key (ex: "help", "doc"
:param value: information's value (ex: "the help string")
"""
self._p_.set_information(key, value)
def get_information(self, key, default=undefined):
"""retrieves one information's item
:param key: the item string (ex: "help")
"""
return self._p_.get_information(key, default)
def del_information(self, key, raises=True):
self._p_.del_information(key, raises)
#______________________________________________________________________
# mandatory warnings
def mandatory_warnings(self,
config_bag):
"""convenience function to trace Options that are mandatory and
where no value has been set
:returns: generator of mandatory Option's path
"""
context = self._getcontext()
settings = context.cfgimpl_get_settings()
# copy
od_setting_properties = config_bag.setting_properties - {'mandatory', 'empty'}
setting_properties = set(config_bag.setting_properties)
setting_properties.update(['mandatory', 'empty'])
config_bag.setting_properties = frozenset(setting_properties)
config_bag.force_permissive = True
config_bag.display_warnings = False
def _mandatory_warnings(description, currpath, config):
is_masterslaves = description.impl_is_master_slaves()
lenmaster = None
optmaster = None
pathmaster = None
for option in description.impl_getchildren(config_bag):
sconfig_bag = config_bag.copy('nooption')
sconfig_bag.option = option
name = option.impl_getname()
path = '.'.join(currpath + [name])
if option.impl_is_optiondescription():
sconfig_bag.setting_properties = od_setting_properties
try:
subconfig = config.getattr(name,
None,
sconfig_bag)
except PropertiesOptionError as err:
pass
else:
for path in _mandatory_warnings(option,
currpath + [name],
subconfig):
yield path
elif not option.impl_is_symlinkoption():
# don't check symlink
self_properties = settings.getproperties(path,
None,
sconfig_bag)
sconfig_bag.properties = self_properties
if 'mandatory' in self_properties or 'empty' in self_properties:
try:
if option.impl_is_master_slaves('slave'):
if lenmaster is None:
# master is a length (so int) if value is already calculated
# otherwise get value and calculate length
nconfig_bag = config_bag.copy('nooption')
nconfig_bag.option = optmaster
values = config.getattr(pathmaster,
None,
nconfig_bag)
lenmaster = len(values)
#if not lenmaster:
# settings.validate_properties(path,
# None,
# sconfig_bag)
#else:
for index in range(lenmaster):
values = config.getattr(name,
index,
sconfig_bag)
else:
value = config.getattr(name,
None,
sconfig_bag)
if is_masterslaves:
lenmaster = len(value)
pathmaster = name
optmaster = option
except PropertiesOptionError as err:
if err.proptype == ['mandatory']:
yield path
if is_masterslaves and lenmaster is None:
break
except ConfigError as err:
#assume that uncalculated value is an empty value
yield path
if is_masterslaves and lenmaster is None:
break
descr = context.cfgimpl_get_description()
for path in _mandatory_warnings(descr, [], context):
yield path