coverage
This commit is contained in:
parent
8be042434b
commit
46e5179d5e
|
@ -565,7 +565,9 @@ def test_callback_value_incr():
|
||||||
val2 = IntOption('val2', "", callback=return_value, callback_params=Params(kwargs={'value': ParamOption(val1)}))
|
val2 = IntOption('val2', "", callback=return_value, callback_params=Params(kwargs={'value': ParamOption(val1)}))
|
||||||
maconfig = OptionDescription('rootconfig', '', [val1, val2])
|
maconfig = OptionDescription('rootconfig', '', [val1, val2])
|
||||||
cfg = Config(maconfig)
|
cfg = Config(maconfig)
|
||||||
cfg.cache.expiration_time(1)
|
assert cfg.cache.get_expiration_time() == 5
|
||||||
|
cfg.cache.set_expiration_time(1)
|
||||||
|
assert cfg.cache.get_expiration_time() == 1
|
||||||
cfg.property.read_write()
|
cfg.property.read_write()
|
||||||
assert cfg.option('val1').value.get() == 1
|
assert cfg.option('val1').value.get() == 1
|
||||||
sleep(1)
|
sleep(1)
|
||||||
|
|
|
@ -7,7 +7,6 @@ from tiramisu.setting import groups, owners
|
||||||
from tiramisu import ChoiceOption, BoolOption, IntOption, IPOption, NetmaskOption, \
|
from tiramisu import ChoiceOption, BoolOption, IntOption, IPOption, NetmaskOption, \
|
||||||
StrOption, OptionDescription, Leadership, Config
|
StrOption, OptionDescription, Leadership, Config
|
||||||
from tiramisu.error import LeadershipError, PropertiesOptionError, APIError, ConfigError
|
from tiramisu.error import LeadershipError, PropertiesOptionError, APIError, ConfigError
|
||||||
from tiramisu.api import TIRAMISU_VERSION
|
|
||||||
from tiramisu.storage import list_sessions
|
from tiramisu.storage import list_sessions
|
||||||
|
|
||||||
|
|
||||||
|
@ -107,7 +106,6 @@ def test_get_group_type():
|
||||||
assert grp.group_type() == groups.family
|
assert grp.group_type() == groups.family
|
||||||
assert grp.group_type() == 'family'
|
assert grp.group_type() == 'family'
|
||||||
assert isinstance(grp.group_type(), groups.GroupType)
|
assert isinstance(grp.group_type(), groups.GroupType)
|
||||||
#raises(TypeError, 'grp.impl_set_group_type(groups.default)')
|
|
||||||
|
|
||||||
|
|
||||||
def test_iter_on_groups():
|
def test_iter_on_groups():
|
||||||
|
@ -237,11 +235,11 @@ def test_groups_is_leader():
|
||||||
assert api.option('leadership.netmask_admin_eth0').option.defaultmulti() == 'value'
|
assert api.option('leadership.netmask_admin_eth0').option.defaultmulti() == 'value'
|
||||||
|
|
||||||
|
|
||||||
if TIRAMISU_VERSION != 2:
|
|
||||||
def test_groups_with_leader_in_root():
|
def test_groups_with_leader_in_root():
|
||||||
ip_admin_eth0 = StrOption('ip_admin_eth0', "ip réseau autorisé", multi=True)
|
ip_admin_eth0 = StrOption('ip_admin_eth0', "ip réseau autorisé", multi=True)
|
||||||
netmask_admin_eth0 = StrOption('netmask_admin_eth0', "masque du sous-réseau", multi=True)
|
netmask_admin_eth0 = StrOption('netmask_admin_eth0', "masque du sous-réseau", multi=True)
|
||||||
interface1 = Leadership('ip_admin_eth0', '', [ip_admin_eth0, netmask_admin_eth0])
|
interface1 = Leadership('ip_admin_eth0', '', [ip_admin_eth0, netmask_admin_eth0])
|
||||||
|
interface1
|
||||||
raises(ConfigError, "Config(interface1)")
|
raises(ConfigError, "Config(interface1)")
|
||||||
|
|
||||||
|
|
||||||
|
@ -269,6 +267,44 @@ def test_groups_with_leader_make_dict():
|
||||||
assert api.value.dict() == {'ip_admin_eth0.ip_admin_eth0': ['ip1', 'ip2'], 'ip_admin_eth0.netmask_admin_eth0': [None, None]}
|
assert api.value.dict() == {'ip_admin_eth0.ip_admin_eth0': ['ip1', 'ip2'], 'ip_admin_eth0.netmask_admin_eth0': [None, None]}
|
||||||
|
|
||||||
|
|
||||||
|
def test_groups_with_leader_default_value():
|
||||||
|
ip_admin_eth0 = StrOption('ip_admin_eth0', "ip réseau autorisé", multi=True)
|
||||||
|
netmask_admin_eth0 = StrOption('netmask_admin_eth0', "masque du sous-réseau", multi=True)
|
||||||
|
interface1 = Leadership('ip_admin_eth0', '', [ip_admin_eth0, netmask_admin_eth0])
|
||||||
|
od = OptionDescription('root', '', [interface1])
|
||||||
|
api = Config(od)
|
||||||
|
assert api.option('ip_admin_eth0.ip_admin_eth0').value.get() == []
|
||||||
|
assert api.option('ip_admin_eth0.ip_admin_eth0').value.default() == []
|
||||||
|
api.option('ip_admin_eth0.ip_admin_eth0').value.set(['ip1', 'ip2'])
|
||||||
|
assert api.option('ip_admin_eth0.ip_admin_eth0').value.get() == ['ip1', 'ip2']
|
||||||
|
assert api.option('ip_admin_eth0.ip_admin_eth0').value.default() == []
|
||||||
|
|
||||||
|
|
||||||
|
def test_groups_with_leader_default_value_2():
|
||||||
|
ip_admin_eth0 = StrOption('ip_admin_eth0', "ip réseau autorisé", ['ip1', 'ip2'], multi=True)
|
||||||
|
netmask_admin_eth0 = StrOption('netmask_admin_eth0', "masque du sous-réseau", default_multi='netmask1', multi=True)
|
||||||
|
interface1 = Leadership('ip_admin_eth0', '', [ip_admin_eth0, netmask_admin_eth0])
|
||||||
|
od = OptionDescription('root', '', [interface1])
|
||||||
|
api = Config(od)
|
||||||
|
assert api.option('ip_admin_eth0.ip_admin_eth0').value.get() == ['ip1', 'ip2']
|
||||||
|
assert api.option('ip_admin_eth0.ip_admin_eth0').value.default() == ['ip1', 'ip2']
|
||||||
|
api.option('ip_admin_eth0.ip_admin_eth0').value.set(['ip3', 'ip4'])
|
||||||
|
assert api.option('ip_admin_eth0.ip_admin_eth0').value.get() == ['ip3', 'ip4']
|
||||||
|
assert api.option('ip_admin_eth0.ip_admin_eth0').value.default() == ['ip1', 'ip2']
|
||||||
|
#
|
||||||
|
assert api.option('ip_admin_eth0.netmask_admin_eth0', 0).value.get() == 'netmask1'
|
||||||
|
assert api.option('ip_admin_eth0.netmask_admin_eth0', 1).value.get() == 'netmask1'
|
||||||
|
assert api.option('ip_admin_eth0.netmask_admin_eth0', 0).value.default() == 'netmask1'
|
||||||
|
assert api.option('ip_admin_eth0.netmask_admin_eth0', 1).value.default() == 'netmask1'
|
||||||
|
assert api.option('ip_admin_eth0.netmask_admin_eth0').value.default() == ['netmask1', 'netmask1']
|
||||||
|
api.option('ip_admin_eth0.netmask_admin_eth0', 1).value.set('netmask2')
|
||||||
|
assert api.option('ip_admin_eth0.netmask_admin_eth0', 0).value.get() == 'netmask1'
|
||||||
|
assert api.option('ip_admin_eth0.netmask_admin_eth0', 1).value.get() == 'netmask2'
|
||||||
|
assert api.option('ip_admin_eth0.netmask_admin_eth0', 0).value.default() == 'netmask1'
|
||||||
|
assert api.option('ip_admin_eth0.netmask_admin_eth0', 1).value.default() == 'netmask1'
|
||||||
|
assert api.option('ip_admin_eth0.netmask_admin_eth0').value.default() == ['netmask1', 'netmask1']
|
||||||
|
|
||||||
|
|
||||||
def test_groups_with_leader_hidden_in_config():
|
def test_groups_with_leader_hidden_in_config():
|
||||||
ip_admin_eth0 = StrOption('ip_admin_eth0', "ip réseau autorisé", multi=True, properties=('hidden',))
|
ip_admin_eth0 = StrOption('ip_admin_eth0', "ip réseau autorisé", multi=True, properties=('hidden',))
|
||||||
netmask_admin_eth0 = StrOption('netmask_admin_eth0', "masque du sous-réseau", multi=True, properties=('hidden',))
|
netmask_admin_eth0 = StrOption('netmask_admin_eth0', "masque du sous-réseau", multi=True, properties=('hidden',))
|
||||||
|
|
|
@ -557,6 +557,26 @@ def test_reset_properties_force_store_value():
|
||||||
'gc.dummy': set(('test', 'force_store_value'))}
|
'gc.dummy': set(('test', 'force_store_value'))}
|
||||||
|
|
||||||
|
|
||||||
|
def test_importation_force_store_value():
|
||||||
|
gcdummy = BoolOption('dummy', 'dummy', default=False,
|
||||||
|
properties=('force_store_value',))
|
||||||
|
gcgroup = OptionDescription('gc', '', [gcdummy])
|
||||||
|
descr = OptionDescription('tiramisu', '', [gcgroup])
|
||||||
|
config1 = Config(descr)
|
||||||
|
assert config1.value.exportation() == [[], [], [], []]
|
||||||
|
config1.property.add('frozen')
|
||||||
|
assert config1.value.exportation() == [[], [], [], []]
|
||||||
|
config1.property.add('force_store_value')
|
||||||
|
assert config1.value.exportation() == [['gc.dummy'], [None], [False], ['forced']]
|
||||||
|
exportation = config1.property.exportation()
|
||||||
|
config2 = Config(descr)
|
||||||
|
assert config2.value.exportation() == [[], [], [], []]
|
||||||
|
config2.property.importation(exportation)
|
||||||
|
assert config2.value.exportation() == [['gc.dummy'], [None], [False], ['forced']]
|
||||||
|
config2.property.importation(exportation)
|
||||||
|
assert config2.value.exportation() == [['gc.dummy'], [None], [False], ['forced']]
|
||||||
|
|
||||||
|
|
||||||
def test_set_modified_value():
|
def test_set_modified_value():
|
||||||
gcdummy = BoolOption('dummy', 'dummy', default=False, properties=('force_store_value',))
|
gcdummy = BoolOption('dummy', 'dummy', default=False, properties=('force_store_value',))
|
||||||
gcgroup = OptionDescription('gc', '', [gcdummy])
|
gcgroup = OptionDescription('gc', '', [gcdummy])
|
||||||
|
|
|
@ -477,6 +477,7 @@ class _TiramisuOptionValueOption:
|
||||||
idx,
|
idx,
|
||||||
self._option_bag.config_bag)
|
self._option_bag.config_bag)
|
||||||
value.append(values.getdefaultvalue(soption_bag))
|
value.append(values.getdefaultvalue(soption_bag))
|
||||||
|
return value
|
||||||
else:
|
else:
|
||||||
return values.getdefaultvalue(self._option_bag)
|
return values.getdefaultvalue(self._option_bag)
|
||||||
|
|
||||||
|
@ -945,7 +946,7 @@ class TiramisuContextProperty(TiramisuContext):
|
||||||
|
|
||||||
def importation(self, properties):
|
def importation(self, properties):
|
||||||
"""Import config properties"""
|
"""Import config properties"""
|
||||||
if 'force_store_value' in properties:
|
if 'force_store_value' in properties.get(None, []):
|
||||||
force_store_value = 'force_store_value' not in self._config_bag.properties
|
force_store_value = 'force_store_value' not in self._config_bag.properties
|
||||||
else:
|
else:
|
||||||
force_store_value = False
|
force_store_value = False
|
||||||
|
@ -1172,17 +1173,6 @@ class _TiramisuContextConfigReset():
|
||||||
# Remove cache
|
# Remove cache
|
||||||
self._config_bag.context.cfgimpl_reset_cache(None, None)
|
self._config_bag.context.cfgimpl_reset_cache(None, None)
|
||||||
|
|
||||||
def __call__(self,
|
|
||||||
path: Optional[str]):
|
|
||||||
"""select a child Tiramisu config"""
|
|
||||||
if path is None:
|
|
||||||
return Config(self._config_bag)
|
|
||||||
spaths = path.split('.')
|
|
||||||
config = self._config_bag.context
|
|
||||||
for spath in spaths:
|
|
||||||
config = config.getconfig(spath)
|
|
||||||
return Config(config)
|
|
||||||
|
|
||||||
|
|
||||||
class _TiramisuContextConfig(TiramisuContext, _TiramisuContextConfigReset):
|
class _TiramisuContextConfig(TiramisuContext, _TiramisuContextConfigReset):
|
||||||
"""Actions to Config"""
|
"""Actions to Config"""
|
||||||
|
@ -1275,10 +1265,13 @@ class TiramisuContextCache(TiramisuContext):
|
||||||
def reset(self):
|
def reset(self):
|
||||||
self._config_bag.context.cfgimpl_reset_cache(None, None)
|
self._config_bag.context.cfgimpl_reset_cache(None, None)
|
||||||
|
|
||||||
def expiration_time(self,
|
def set_expiration_time(self,
|
||||||
time: int):
|
time: int) -> None:
|
||||||
self._config_bag.expiration_time = time
|
self._config_bag.expiration_time = time
|
||||||
|
|
||||||
|
def get_expiration_time(self) -> int:
|
||||||
|
return self._config_bag.expiration_time
|
||||||
|
|
||||||
|
|
||||||
class TiramisuDispatcher:
|
class TiramisuDispatcher:
|
||||||
pass
|
pass
|
||||||
|
|
|
@ -282,7 +282,7 @@ class Base:
|
||||||
return self._name
|
return self._name
|
||||||
|
|
||||||
def _set_readonly(self) -> None:
|
def _set_readonly(self) -> None:
|
||||||
if not self.impl_is_readonly():
|
if isinstance(self._informations, dict):
|
||||||
_setattr = object.__setattr__
|
_setattr = object.__setattr__
|
||||||
dico = self._informations
|
dico = self._informations
|
||||||
keys = tuple(dico.keys())
|
keys = tuple(dico.keys())
|
||||||
|
|
|
@ -59,22 +59,16 @@ class NetmaskOption(StrOption):
|
||||||
raise ConfigError(_('network_netmask needs a network and a netmask'))
|
raise ConfigError(_('network_netmask needs a network and a netmask'))
|
||||||
if None in vals or len(vals) != 2:
|
if None in vals or len(vals) != 2:
|
||||||
return
|
return
|
||||||
msg = None
|
|
||||||
val_netmask, val_network = vals
|
val_netmask, val_network = vals
|
||||||
try:
|
try:
|
||||||
ip_network('{0}/{1}'.format(val_network, val_netmask))
|
ip_network('{0}/{1}'.format(val_network, val_netmask))
|
||||||
except ValueError:
|
except ValueError:
|
||||||
if current_opt == opts[1]:
|
if current_opt == opts[1]:
|
||||||
raise ValueError(_('with netmask "{0}" ("{1}")').format(val_netmask, opts[0].impl_get_display_name()))
|
raise ValueError(_('with netmask "{0}" ("{1}")').format(val_netmask,
|
||||||
|
opts[0].impl_get_display_name()))
|
||||||
else:
|
else:
|
||||||
raise ValueError(_('with network "{0}" ("{1}")').format(val_network, opts[1].impl_get_display_name()))
|
raise ValueError(_('with network "{0}" ("{1}")').format(val_network,
|
||||||
if msg is not None:
|
opts[1].impl_get_display_name()))
|
||||||
self.raise_err(msg,
|
|
||||||
val_netmask,
|
|
||||||
val_network,
|
|
||||||
current_opt,
|
|
||||||
opts,
|
|
||||||
'network')
|
|
||||||
|
|
||||||
def _cons_ip_netmask(self,
|
def _cons_ip_netmask(self,
|
||||||
current_opt,
|
current_opt,
|
||||||
|
@ -90,46 +84,22 @@ class NetmaskOption(StrOption):
|
||||||
return
|
return
|
||||||
msg = None
|
msg = None
|
||||||
val_netmask, val_ip = vals
|
val_netmask, val_ip = vals
|
||||||
try:
|
|
||||||
ip = ip_interface('{0}/{1}'.format(val_ip, val_netmask))
|
ip = ip_interface('{0}/{1}'.format(val_ip, val_netmask))
|
||||||
network = ip.network
|
network = ip.network
|
||||||
# if not ip same has network
|
|
||||||
if ip.ip == network.network_address:
|
if ip.ip == network.network_address:
|
||||||
if not _cidr and current_opt == opts[1]:
|
if not _cidr and current_opt == opts[1]:
|
||||||
msg = _('this is a network with netmask "{0}" ("{1}")')
|
msg = _('this is a network with netmask "{0}" ("{1}")')
|
||||||
else:
|
else:
|
||||||
msg = _('{2} "{0}" ("{1}") is the network')
|
msg = _('IP "{0}" ("{1}") is the network')
|
||||||
elif ip.ip == network.broadcast_address:
|
elif ip.ip == network.broadcast_address:
|
||||||
if not _cidr and current_opt == opts[1]:
|
if not _cidr and current_opt == opts[1]:
|
||||||
msg = _('this is a broadcast with netmask "{0}" ("{1}")')
|
msg = _('this is a broadcast with netmask "{0}" ("{1}")')
|
||||||
else:
|
else:
|
||||||
msg = _('{2} "{0}" ("{1}") is the broadcast')
|
msg = _('IP "{0}" ("{1}") is the broadcast')
|
||||||
except ValueError:
|
|
||||||
import traceback
|
|
||||||
traceback.print_exc()
|
|
||||||
pass
|
|
||||||
if msg is not None:
|
if msg is not None:
|
||||||
self.raise_err(msg,
|
|
||||||
val_netmask,
|
|
||||||
val_ip,
|
|
||||||
current_opt,
|
|
||||||
opts,
|
|
||||||
'IP',
|
|
||||||
_cidr)
|
|
||||||
|
|
||||||
|
|
||||||
def raise_err(self,
|
|
||||||
msg,
|
|
||||||
val_netmask,
|
|
||||||
val_ipnetwork,
|
|
||||||
current_opt,
|
|
||||||
opts,
|
|
||||||
typ,
|
|
||||||
_cidr=False):
|
|
||||||
if not _cidr and current_opt == opts[1]:
|
if not _cidr and current_opt == opts[1]:
|
||||||
raise ValueError(msg.format(val_netmask,
|
raise ValueError(msg.format(val_netmask,
|
||||||
opts[1].impl_get_display_name()))
|
opts[0].impl_get_display_name()))
|
||||||
else:
|
else:
|
||||||
raise ValueError(msg.format(val_ipnetwork,
|
raise ValueError(msg.format(val_ip,
|
||||||
opts[0].impl_get_display_name(),
|
opts[1].impl_get_display_name()))
|
||||||
typ))
|
|
||||||
|
|
|
@ -206,7 +206,13 @@ class Option(BaseOption):
|
||||||
|
|
||||||
def impl_get_extra(self,
|
def impl_get_extra(self,
|
||||||
key: str) -> Any:
|
key: str) -> Any:
|
||||||
return getattr(self, '_extra', {}).get(key)
|
extra = getattr(self, '_extra', {})
|
||||||
|
if isinstance(extra, tuple):
|
||||||
|
if key in extra[0]:
|
||||||
|
return extra[1][extra[0].index(key)]
|
||||||
|
return None
|
||||||
|
else:
|
||||||
|
return extra.get(key)
|
||||||
|
|
||||||
#__________________________________________________________________________
|
#__________________________________________________________________________
|
||||||
# validator
|
# validator
|
||||||
|
|
Loading…
Reference in New Issue