reorganise consistencies

This commit is contained in:
Emmanuel Garette 2017-12-23 10:40:41 +01:00
parent 538e6a792a
commit 7ab479f628
11 changed files with 1052 additions and 218 deletions

View File

@ -0,0 +1,837 @@
from .autopath import do_autopath
do_autopath()
from py.test import raises
from tiramisu.setting import owners, groups
from tiramisu import IPOption, NetworkOption, NetmaskOption, IntOption,\
BroadcastOption, StrOption, SymLinkOption, OptionDescription, submulti, MasterSlaves,\
Config, getapi, undefined
from tiramisu.error import ConfigError, ValueWarning, PropertiesOptionError
import warnings
def return_value(value=None):
return value
def test_consistency():
a = IntOption('a', '')
b = IntOption('b', '')
a.impl_add_consistency('not_equal', b)
#consistency to itself
raises(ConfigError, "a.impl_add_consistency('not_equal', a)")
def test_consistency_not_exists():
a = IntOption('a', '')
b = IntOption('b', '')
a, b
raises(ConfigError, "a.impl_add_consistency('not_exists', b)")
def test_consistency_unknown_params():
a = IntOption('a', '')
b = IntOption('b', '')
a, b
raises(ValueError, "a.impl_add_consistency('not_equal', b, unknown=False)")
def test_consistency_warnings_only_default():
a = IntOption('a', '', 1)
b = IntOption('b', '', 1)
warnings.simplefilter("always", ValueWarning)
with warnings.catch_warnings(record=True) as w:
a.impl_add_consistency('not_equal', b, warnings_only=True)
assert w != []
def test_consistency_warnings_only():
a = IntOption('a', '')
b = IntOption('b', '')
od = OptionDescription('od', '', [a, b])
a.impl_add_consistency('not_equal', b, warnings_only=True)
api = getapi(Config(od))
api.option('a').value.set(1)
warnings.simplefilter("always", ValueWarning)
with warnings.catch_warnings(record=True) as w:
api.option('b').value.set(1)
assert w != []
def test_consistency_warnings_only_more_option():
a = IntOption('a', '')
b = IntOption('b', '')
d = IntOption('d', '')
od = OptionDescription('od', '', [a, b, d])
a.impl_add_consistency('not_equal', b, d, warnings_only=True)
api = getapi(Config(od))
api.option('a').value.set(1)
warnings.simplefilter("always", ValueWarning)
with warnings.catch_warnings(record=True) as w:
api.option('b').value.set(1)
assert w != []
assert len(w) == 1
with warnings.catch_warnings(record=True) as w:
api.option('d').value.get()
assert w != []
assert len(w) == 1
with warnings.catch_warnings(record=True) as w:
api.option('d').value.set(1)
assert w != []
assert len(w) == 1
def test_consistency_not_equal():
a = IntOption('a', '')
b = IntOption('b', '')
od = OptionDescription('od', '', [a, b])
a.impl_add_consistency('not_equal', b)
api = getapi(Config(od))
assert api.option('a').value.get() is None
assert api.option('b').value.get() is None
api.option('a').value.set(1)
api.option('a').value.reset()
api.option('a').value.set(1)
raises(ValueError, "api.option('b').value.set(1)")
api.option('b').value.set(2)
def test_consistency_not_equal_many_opts():
a = IntOption('a', '')
b = IntOption('b', '')
c = IntOption('c', '')
d = IntOption('d', '')
e = IntOption('e', '')
f = IntOption('f', '')
od = OptionDescription('od', '', [a, b, c, d, e, f])
a.impl_add_consistency('not_equal', b, c, d, e, f)
api = getapi(Config(od))
assert api.option('a').value.get() is None
assert api.option('b').value.get() is None
#
api.option('a').value.set(1)
api.option('a').value.reset()
#
api.option('a').value.set(1)
raises(ValueError, "api.option('b').value.set(1)")
#
api.option('b').value.set(2)
raises(ValueError, "api.option('f').value.set(2)")
raises(ValueError, "api.option('f').value.set(1)")
#
api.option('d').value.set(3)
raises(ValueError, "api.option('f').value.set(3)")
raises(ValueError, "api.option('a').value.set(3)")
api.option('d').value.set(3)
raises(ValueError, "api.option('c').value.set(3)")
raises(ValueError, "api.option('e').value.set(3)")
def test_consistency_not_equal_many_opts_one_disabled():
a = IntOption('a', '')
b = IntOption('b', '')
c = IntOption('c', '')
d = IntOption('d', '')
e = IntOption('e', '')
f = IntOption('f', '')
g = IntOption('g', '', properties=('disabled',))
od = OptionDescription('od', '', [a, b, c, d, e, f, g])
a.impl_add_consistency('not_equal', b, c, d, e, f, g, transitive=False)
api = getapi(Config(od))
api.property.read_write()
assert api.option('a').value.get() is None
assert api.option('b').value.get() is None
#
api.option('a').value.set(1)
api.option('a').value.reset()
#
api.option('a').value.set(1)
raises(ValueError, "api.option('b').value.set(1)")
#
api.option('b').value.set(2)
raises(ValueError, "api.option('f').value.set(2)")
raises(ValueError, "api.option('f').value.set(1)")
#
api.option('d').value.set(3)
raises(ValueError, "api.option('f').value.set(3)")
raises(ValueError, "api.option('a').value.set(3)")
raises(ValueError, "api.option('c').value.set(3)")
raises(ValueError, "api.option('e').value.set(3)")
def test_consistency_not_in_config_1():
a = IntOption('a', '')
b = IntOption('b', '')
a.impl_add_consistency('not_equal', b)
od1 = OptionDescription('od1', '', [a])
od = OptionDescription('root', '', [od1])
od
raises(ConfigError, "Config(od)")
def test_consistency_not_in_config_2():
a = IntOption('a', '')
b = IntOption('b', '')
a.impl_add_consistency('not_equal', b)
od1 = OptionDescription('od1', '', [a])
od2 = OptionDescription('od2', '', [b])
od = OptionDescription('root', '', [od1, od2])
Config(od)
def test_consistency_not_in_config_3():
a = IntOption('a', '')
b = IntOption('b', '')
a.impl_add_consistency('not_equal', b)
od1 = OptionDescription('od1', '', [a])
od2 = OptionDescription('od2', '', [b])
od = OptionDescription('root', '', [od1, od2])
od
#with subconfig
raises(ConfigError, "Config(od1)")
def test_consistency_after_config():
a = IntOption('a', '')
b = IntOption('b', '')
od1 = OptionDescription('od1', '', [a])
od2 = OptionDescription('od2', '', [b])
od = OptionDescription('root', '', [od1, od2])
Config(od)
raises(AttributeError, "a.impl_add_consistency('not_equal', b)")
def test_consistency_not_equal_symlink():
a = IntOption('a', '')
b = IntOption('b', '')
c = SymLinkOption('c', a)
od = OptionDescription('od', '', [a, b, c])
a.impl_add_consistency('not_equal', b)
api = getapi(Config(od))
assert set(od._cache_consistencies.keys()) == set([a, b])
def test_consistency_not_equal_submulti():
a = IntOption('a', '', multi=submulti)
b = IntOption('b', '', multi=submulti)
od = OptionDescription('a', '', [a, b])
raises(ConfigError, 'a.impl_add_consistency("not_equal", b)')
def test_consistency_not_equal_default_submulti():
a = IntOption('a', '', [[1, 2]], multi=submulti)
b = IntOption('b', '', [[1]], multi=submulti)
od = OptionDescription('od', '', [a, b])
od
raises(ConfigError, "a.impl_add_consistency('not_equal', b)")
def test_consistency_not_equal_masterslave():
a = IntOption('a', '', multi=True)
b = IntOption('b', '', multi=True)
od = MasterSlaves('a', '', [a, b])
od2 = OptionDescription('b', '', [od])
#od.impl_set_group_type(groups.master)
a.impl_add_consistency('not_equal', b)
api = getapi(Config(od2))
assert api.option('a.a').value.get() == []
api.option('a.a').value.set([1])
api.option('a.a').value.reset()
api.option('a.a').value.set([1])
raises(ValueError, "api.option('a.b', 0).value.set(1)")
api.option('a.b', 0).value.set(2)
api.option('a.a').value.reset()
api.option('a.a').value.set([1])
api.option.make_dict()
def test_consistency_not_equal_masterslave_error_multi1():
a = IPOption('a', '', multi=True)
b = NetmaskOption('b', '', multi=True)
c = NetmaskOption('c', '', multi=True)
od = MasterSlaves('a', '', [a, b])
#od.impl_set_group_type(groups.master)
od2 = OptionDescription('b', '', [od, c])
c.impl_add_consistency('ip_netmask', a)
raises(ConfigError, "Config(od2)")
def test_consistency_not_equal_masterslave_error_multi2():
a = IPOption('a', '', multi=True)
b = NetmaskOption('b', '', multi=True)
c = IPOption('c', '', multi=True)
od = MasterSlaves('a', '', [a, b])
#od.impl_set_group_type(groups.master)
od2 = OptionDescription('b', '', [od, c])
b.impl_add_consistency('ip_netmask', c)
raises(ConfigError, "Config(od2)")
def test_consistency_not_equal_masterslave_error_othermaster():
a = IPOption('a', '', multi=True)
b = NetmaskOption('b', '', multi=True)
c = IPOption('c', '', multi=True)
d = NetmaskOption('d', '', multi=True)
od = MasterSlaves('a', '', [a, b])
#od.impl_set_group_type(groups.master)
od2 = MasterSlaves('c', '', [c, d])
#od2.impl_set_group_type(groups.master)
od3 = OptionDescription('b', '', [od, od2])
d.impl_add_consistency('ip_netmask', a)
raises(ConfigError, "Config(od2)")
def test_consistency_not_equal_masterslaves_default():
a = IntOption('a', '', multi=True)
b = IntOption('b', '', multi=True, default_multi=1)
od = MasterSlaves('a', '', [a, b])
od2 = OptionDescription('a', '', [od])
#od.impl_set_group_type(groups.master)
a.impl_add_consistency('not_equal', b)
api = getapi(Config(od2))
assert api.option('a.a').value.get() == []
raises(ValueError, "api.option('a.a').value.set([1])")
api.option('a.a').value.set([2])
api.option('a.a').value.reset()
def test_consistency_not_equal_multi():
a = IntOption('a', '', multi=True)
b = IntOption('b', '', multi=True)
od = OptionDescription('a', '', [a, b])
a.impl_add_consistency('not_equal', b)
api = getapi(Config(od))
assert api.option('a').value.get() == []
assert api.option('b').value.get() == []
api.option('a').value.set([1])
api.option('a').value.reset()
api.option('a').value.set([1])
raises(ValueError, "api.option('b').value.set([1])")
api.option('a').value.set([2])
raises(ValueError, "api.option('b').value.set([2, 1])")
api.option('a').value.set([2, 3])
raises(ValueError, "api.option('a').value.set([2, 3, 3])")
raises(ValueError, "api.option('b').value.set([2, 3])")
def test_consistency_not_equal_multi_default():
a = IntOption('a', '', multi=True, default=[1])
b = IntOption('b', '', multi=True, default=[1, 2])
od = OptionDescription('a', '', [a, b])
raises(ValueError, "a.impl_add_consistency('not_equal', b)")
def test_consistency_not_equal_multi_default_modif():
a = IntOption('a', '', multi=True)
b = IntOption('b', '', multi=True, default=[1, 2])
od = OptionDescription('a', '', [a, b])
a.impl_add_consistency('not_equal', b)
api = getapi(Config(od))
assert api.option('a').value.get() == []
assert api.option('b').value.get() == [1, 2]
raises(ValueError, "api.option('a').value.set([1])")
raises(ValueError, "api.option('b').value.set([1, 2, 1])")
def test_consistency_default():
a = IntOption('a', '', 1)
b = IntOption('b', '', 1)
a, b
raises(ValueError, "a.impl_add_consistency('not_equal', b)")
def test_consistency_default_multi():
a = IntOption('a', '', [2, 1], multi=True)
b = IntOption('b', '', [1, 1], multi=True)
c = IntOption('c', '', [1, 2], multi=True)
b
raises(ValueError, "a.impl_add_consistency('not_equal', b)")
raises(ValueError, "a.impl_add_consistency('not_equal', c)")
def test_consistency_default_diff():
a = IntOption('a', '', 3)
b = IntOption('b', '', 1)
od = OptionDescription('od', '', [a, b])
a.impl_add_consistency('not_equal', b)
api = getapi(Config(od))
raises(ValueError, "api.option('a').value.set(1)")
api.option('a').value.set(2)
api.option('b').value.set(3)
owner = api.owner.get()
assert api.option('a').owner.get() == owner
raises(ValueError, "api.option('a').value.reset()")
assert api.option('a').owner.get() == owner
def test_consistency_ip_netmask():
a = IPOption('a', '')
b = NetmaskOption('b', '')
od = OptionDescription('od', '', [a, b])
b.impl_add_consistency('ip_netmask', a)
api = getapi(Config(od))
api.option('a').value.set('192.168.1.1')
api.option('b').value.set('255.255.255.0')
api.option('a').value.set('192.168.1.2')
api.option('b').value.set('255.255.255.255')
api.option('b').value.set('255.255.255.0')
raises(ValueError, "api.option('a').value.set('192.168.1.0')")
raises(ValueError, "api.option('a').value.set('192.168.1.255')")
def test_consistency_network_netmask():
a = NetworkOption('a', '')
b = NetmaskOption('b', '')
od = OptionDescription('od', '', [a, b])
b.impl_add_consistency('network_netmask', a)
api = getapi(Config(od))
api.option('a').value.set('192.168.1.1')
api.option('b').value.set('255.255.255.255')
api.option('b').value.reset()
api.option('a').value.set('192.168.1.0')
api.option('b').value.set('255.255.255.0')
raises(ValueError, "api.option('a').value.set('192.168.1.1')")
def test_consistency_ip_in_network():
a = NetworkOption('a', '')
b = NetmaskOption('b', '')
c = IPOption('c', '')
d = IPOption('d', '')
od = OptionDescription('od', '', [a, b, c, d])
c.impl_add_consistency('in_network', a, b)
d.impl_add_consistency('in_network', a, b, warnings_only=True)
warnings.simplefilter("always", ValueWarning)
api = getapi(Config(od))
api.option('a').value.set('192.168.1.0')
api.option('b').value.set('255.255.255.0')
api.option('c').value.set('192.168.1.1')
raises(ValueError, "api.option('c').value.set('192.168.2.1')")
raises(ValueError, "api.option('c').value.set('192.168.1.0')")
raises(ValueError, "api.option('c').value.set('192.168.1.255')")
with warnings.catch_warnings(record=True) as w:
api.option('d').value.set('192.168.2.1')
assert len(w) == 1
def test_consistency_ip_in_network_len_error():
a = NetworkOption('a', '')
b = NetmaskOption('b', '')
c = IPOption('c', '')
od = OptionDescription('od', '', [a, b, c])
raises(ConfigError, "c.impl_add_consistency('in_network', a)")
def test_consistency_ip_netmask_network_error():
a = IPOption('a', '')
b = NetworkOption('b', '')
c = NetmaskOption('c', '')
od = OptionDescription('od', '', [a, b, c])
c.impl_add_consistency('ip_netmask', a, b)
api = getapi(Config(od))
api.option('a').value.set('192.168.1.1')
api.option('b').value.set('192.168.1.0')
raises(ConfigError, "api.option('c').value.set('255.255.255.0')")
def test_consistency_ip_netmask_error_multi():
a = IPOption('a', '', multi=True)
b = NetmaskOption('b', '')
OptionDescription('od', '', [a, b])
raises(ConfigError, "b.impl_add_consistency('ip_netmask', a)")
def test_consistency_ip_netmask_multi():
a = IPOption('a', '', multi=True)
b = NetmaskOption('b', '', multi=True)
od = MasterSlaves('a', '', [a, b])
b.impl_add_consistency('ip_netmask', a)
od2 = OptionDescription('od2', '', [od])
api = getapi(Config(od2))
api.option('a.a').value.set(['192.168.1.1'])
api.option('a.b', 0).value.set('255.255.255.0')
api.option('a.a').value.set(['192.168.1.2'])
api.option('a.b', 0).value.set('255.255.255.255')
api.option('a.b', 0).value.set('255.255.255.0')
raises(ValueError, "api.option('a.a').value.set(['192.168.1.0'])")
def test_consistency_network_netmask_multi():
a = NetworkOption('a', '', multi=True)
b = NetmaskOption('b', '', multi=True)
od = MasterSlaves('a', '', [a, b])
b.impl_add_consistency('network_netmask', a)
od2 = OptionDescription('od', '', [od])
api = getapi(Config(od2))
api.option('a.a').value.set(['192.168.1.1'])
api.option('a.b', 0).value.set('255.255.255.255')
api.option('a.b', 0).value.reset()
api.option('a.a').value.set(['192.168.1.0'])
api.option('a.b', 0).value.set('255.255.255.0')
raises(ValueError, "api.option('a.a').value.set(['192.168.1.1'])")
def test_consistency_network_netmask_multi_slave_default_multi():
a = NetworkOption('a', '', default_multi=u'192.168.1.0', multi=True, properties=('mandatory',))
b = NetmaskOption('b', '', default_multi=u'255.255.255.0', multi=True, properties=('mandatory',))
od = MasterSlaves('a', '', [a, b])
od2 = OptionDescription('od2', '', [od])
#od.impl_set_group_type(groups.master)
b.impl_add_consistency('network_netmask', a)
api = getapi(Config(od2))
api.property.read_write()
api.option('a.a').value.set([undefined])
assert api.option('a.a').value.get() == ['192.168.1.0']
assert api.option('a.b', 0).value.get() == '255.255.255.0'
def test_consistency_network_netmask_multi_slave_default():
a = NetworkOption('a', '', multi=True, properties=('mandatory',))
b = NetmaskOption('b', '', default_multi=u'255.255.255.0', multi=True, properties=('mandatory',))
od = MasterSlaves('a', '', [a, b])
#od.impl_set_group_type(groups.master)
b.impl_add_consistency('network_netmask', a)
od2 = OptionDescription('od2', '', [od])
api = getapi(Config(od2))
api.property.read_write()
api.property.pop('cache')
assert api.option('a.a').value.get() == []
api.option('a.a').value.set(['192.168.1.0'])
api.property.read_only()
assert api.option('a.a').value.get() == [u'192.168.1.0']
assert api.option('a.b', 0).value.get() == u'255.255.255.0'
api.property.read_write()
raises(ValueError, "api.option('a.a').value.set([u'192.168.1.0', u'192.168.1.1'])")
api.option('a.a').value.set(['192.168.1.0', undefined])
api.option('a.b', 0).value.set('255.255.255.0')
api.option('a.b', 1).value.set('255.255.255.255')
api.option('a.a').value.set([u'192.168.1.0', u'192.168.1.1'])
def return_netmask(*args, **kwargs):
return u'255.255.255.0'
def return_netmask2(master):
if master is not None:
if master.endswith('2.1'):
return u'255.255.255.0'
if not master.endswith('.0'):
return u'255.255.255.255'
return u'255.255.255.0'
def test_consistency_network_netmask_multi_slave_callback():
a = NetworkOption('a', '', multi=True, properties=('mandatory',))
b = NetmaskOption('b', '', callback=return_netmask, multi=True, properties=('mandatory',))
od = MasterSlaves('a', '', [a, b])
#od.impl_set_group_type(groups.master)
b.impl_add_consistency('network_netmask', a)
od2 = OptionDescription('od2', '', [od])
api = getapi(Config(od2))
api.property.read_write()
api.property.pop('cache')
assert api.option('a.a').value.get() == []
api.option('a.a').value.set(['192.168.1.0'])
api.property.read_only()
assert api.option('a.a').value.get() == [u'192.168.1.0']
assert api.option('a.b', 0).value.get() == '255.255.255.0'
api.property.read_write()
raises(ValueError, "assert api.option('a.a').value.set([u'192.168.1.0', u'192.168.1.1'])")
api.option('a.a').value.set(['192.168.1.0', undefined])
api.option('a.b', 0).value.set('255.255.255.0')
api.option('a.b', 1).value.set('255.255.255.255')
api.option('a.a').value.set(['192.168.1.0', '192.168.1.1'])
def test_consistency_network_netmask_multi_slave_callback_value():
a = NetworkOption('a', '', multi=True, properties=('mandatory',))
b = NetmaskOption('b', '', callback=return_netmask2, callback_params={'': ((a, False),)}, multi=True, properties=('mandatory',))
od = MasterSlaves('a', '', [a, b])
#od.impl_set_group_type(groups.master)
b.impl_add_consistency('network_netmask', a)
od2 = OptionDescription('od2', '', [od])
api = getapi(Config(od2))
api.property.read_write()
api.property.pop('cache')
assert api.option('a.a').value.get() == []
api.option('a.a').value.set(['192.168.1.0'])
assert api.option('a.a').value.get() == ['192.168.1.0']
assert api.option('a.b', 0).value.get() == '255.255.255.0'
raises(ValueError, "api.option('a.a').value.set(['192.168.1.0', '192.168.2.1'])")
assert api.option('a.a').value.get() == [u'192.168.1.0']
assert api.option('a.b', 0).value.get() == '255.255.255.0'
raises(ValueError, "api.option('a.a').value.set(['192.168.2.1'])")
assert api.option('a.a').value.get() == [u'192.168.1.0']
assert api.option('a.b', 0).value.get() == '255.255.255.0'
api.option('a.a').value.set(['192.168.1.0', '192.168.1.1'])
api.option('a.b', 0).value.set('255.255.255.0')
api.option('a.b', 1).value.set('255.255.255.255')
def test_consistency_ip_netmask_multi_master():
a = IPOption('a', '', multi=True)
b = NetmaskOption('b', '', multi=True)
od = MasterSlaves('a', '', [a, b])
#od.impl_set_group_type(groups.master)
b.impl_add_consistency('ip_netmask', a)
od2 = OptionDescription('od2', '', [od])
api = getapi(Config(od2))
api.option('a.a').value.set(['192.168.1.1'])
api.option('a.b', 0).value.set('255.255.255.0')
api.option('a.a').value.set(['192.168.1.2'])
api.option('a.b', 0).value.set('255.255.255.255')
api.option('a.b', 0).value.set('255.255.255.0')
raises(ValueError, "api.option('a.a').value.set(['192.168.1.0'])")
api.option('a.a').value.set(['192.168.1.128'])
raises(ValueError, "api.option('a.b', 0).value.set('255.255.255.128')")
api.option('a.a').value.set(['192.168.1.2', '192.168.1.3'])
def test_consistency_network_netmask_multi_master():
a = NetworkOption('a', '', multi=True)
b = NetmaskOption('b', '', multi=True)
od = MasterSlaves('a', '', [a, b])
#od.impl_set_group_type(groups.master)
b.impl_add_consistency('network_netmask', a)
od2 = OptionDescription('od2', '', [od])
api = getapi(Config(od2))
api.option('a.a').value.set(['192.168.1.1'])
api.option('a.b', 0).value.set('255.255.255.255')
api.option('a.b', 0).value.reset()
api.option('a.a').value.set(['192.168.1.0'])
api.option('a.b', 0).value.set('255.255.255.0')
raises(ValueError, "api.option('a.a').value.set(['192.168.1.1'])")
def test_consistency_broadcast():
a = NetworkOption('a', '', multi=True)
b = NetmaskOption('b', '', multi=True)
c = BroadcastOption('c', '', multi=True)
od = MasterSlaves('a', '', [a, b, c])
#od.impl_set_group_type(groups.master)
b.impl_add_consistency('network_netmask', a)
c.impl_add_consistency('broadcast', a, b)
od2 = OptionDescription('od2', '', [od])
api = getapi(Config(od2))
#first, test network_netmask
api.option('a.a').value.set(['192.168.1.128'])
raises(ValueError, "api.option('a.a').value.set(['255.255.255.0'])")
#
api.option('a.a').value.set(['192.168.1.0'])
api.option('a.b', 0).value.set('255.255.255.0')
api.option('a.c', 0).value.set('192.168.1.255')
raises(ValueError, "api.option('a.a').value.set(['192.168.1.1'])")
#
api.option('a.a').value.set(['192.168.1.0', '192.168.2.128'])
api.option('a.b', 0).value.set('255.255.255.0')
api.option('a.b', 1).value.set('255.255.255.128')
api.option('a.c', 0).value.set('192.168.1.255')
api.option('a.c', 1).value.set('192.168.2.255')
raises(ValueError, "api.option('a.c', 1).value.set('192.168.2.128')")
api.option('a.c', 1).value.set('192.168.2.255')
def test_consistency_broadcast_error():
a = NetworkOption('a', '', multi=True)
b = NetmaskOption('b', '', multi=True)
c = BroadcastOption('c', '', multi=True)
od = MasterSlaves('a', '', [a, b, c])
od2 = OptionDescription('od2', '', [od])
b.impl_add_consistency('network_netmask', a)
c.impl_add_consistency('broadcast', a)
api = getapi(Config(od2))
raises(ConfigError, "api.option('a.a').value.set(['192.168.1.0'])")
def test_consistency_broadcast_warnings():
warnings.simplefilter("always", ValueWarning)
a = NetworkOption('a', '', properties=('mandatory', 'disabled'))
b = NetmaskOption('b', '', properties=('mandatory', 'disabled'))
c = NetmaskOption('c', '', properties=('mandatory', 'disabled'))
od = OptionDescription('a', '', [a, b, c])
b.impl_add_consistency('network_netmask', a, warnings_only=True)
api = getapi(Config(od))
with warnings.catch_warnings(record=True) as w:
api.option('a').value.set('192.168.1.4')
api.option('b').value.set('255.255.255.0')
assert len(w) == 1
api.property.read_write()
with warnings.catch_warnings(record=True) as w:
list(api.value.mandatory_warnings())
assert len(w) == 0
def test_consistency_broadcast_default_1():
a = NetworkOption('a', '', '192.168.1.0')
b = NetmaskOption('b', '', '255.255.255.128')
c = BroadcastOption('c', '', '192.168.2.127')
od = OptionDescription('a', '', [a, b, c])
od
raises(ValueError, "c.impl_add_consistency('broadcast', a, b)")
def test_consistency_broadcast_default_2():
a = NetworkOption('a', '', '192.168.1.0')
b = NetmaskOption('b', '', '255.255.255.128')
d = BroadcastOption('d', '', '192.168.1.127')
od2 = OptionDescription('a', '', [a, b, d])
od2
d.impl_add_consistency('broadcast', a, b)
def test_consistency_not_all():
#_cache_consistencies is not None by not options has consistencies
a = NetworkOption('a', '', multi=True)
b = NetmaskOption('b', '', multi=True)
c = BroadcastOption('c', '', multi=True)
od = MasterSlaves('a', '', [a, b, c])
#od.impl_set_group_type(groups.master)
b.impl_add_consistency('network_netmask', a)
od2 = OptionDescription('od2', '', [od])
api = getapi(Config(od2))
api.option('a.a').value.set(['192.168.1.0'])
api.option('a.b', 0).value.set('255.255.255.0')
api.option('a.c', 0).value.set('192.168.1.255')
def test_consistency_permissive():
a = IntOption('a', '', 1)
b = IntOption('b', '', 2, properties=('hidden',))
od = OptionDescription('od', '', [a, b])
a.impl_add_consistency('not_equal', b)
api = getapi(Config(od))
api.property.read_write()
api.permissive.set(('hidden',))
api.option('a').value.set(1)
def test_consistency_disabled():
a = IntOption('a', '')
b = IntOption('b', '', properties=('disabled',))
od = OptionDescription('od', '', [a, b])
a.impl_add_consistency('not_equal', b)
api = getapi(Config(od))
api.property.read_write()
raises(PropertiesOptionError, "api.option('a').value.set(1)")
def test_consistency_disabled_transitive():
a = IntOption('a', '')
b = IntOption('b', '', properties=('disabled',))
od = OptionDescription('od', '', [a, b])
a.impl_add_consistency('not_equal', b, transitive=False)
api = getapi(Config(od))
api.property.read_write()
api.option('a').value.set(1)
def test_consistency_disabled_transitive_2():
a = IPOption('a', '')
b = IPOption('b', '')
c = NetworkOption('c', '', default='192.168.1.0')
d = NetmaskOption('d', '', default='255.255.255.0', properties=('disabled',))
od = OptionDescription('od', '', [a, b, c, d])
a.impl_add_consistency('not_equal', b)
a.impl_add_consistency('in_network', c, d, transitive=False)
api = getapi(Config(od))
api.property.read_write()
api.option('a').value.set('192.168.1.1')
raises(ValueError, "api.option('b').value.set('192.168.1.1')")
api.option('a').value.set('192.168.2.1')
#
api.option('a').value.set('192.168.1.1')
api.property.pop('disabled')
raises(ValueError, "api.option('a').value.set('192.168.2.1')")
def return_val(*args, **kwargs):
return '192.168.1.1'
def test_consistency_with_callback():
a = NetworkOption('a', '', default='192.168.1.0')
b = NetmaskOption('b', '', default='255.255.255.0')
c = IPOption('c', '', callback=return_val, callback_params={'': ((a, False),)})
od = OptionDescription('od', '', [a, b, c])
c.impl_add_consistency('in_network', a, b)
api = getapi(Config(od))
api.option('c').value.get()
def test_consistency_double_warnings():
a = IntOption('a', '')
b = IntOption('b', '', 1)
c = IntOption('c', '', 1)
od = OptionDescription('od', '', [a, b, c])
warnings.simplefilter("always", ValueWarning)
a.impl_add_consistency('not_equal', b, warnings_only=True)
a.impl_add_consistency('not_equal', c, warnings_only=True)
od2 = OptionDescription('od2', '', [od])
api = getapi(Config(od2))
with warnings.catch_warnings(record=True) as w:
api.option('od.a').value.set(1)
assert w != []
assert len(w) == 2
with warnings.catch_warnings(record=True) as w:
api.option('od.c').value.set(2)
assert len(w) == 1
with warnings.catch_warnings(record=True) as w:
api.option('od.a').value.set(2)
assert w != []
assert len(w) == 1
api.property.pop('warnings')
with warnings.catch_warnings(record=True) as w:
api.option('od.a').value.set(1)
assert w == []
def test_consistency_warnings_error():
a = IntOption('a', '')
b = IntOption('b', '', 1)
c = IntOption('c', '', 1)
od = OptionDescription('od', '', [a, b, c])
warnings.simplefilter("always", ValueWarning)
a.impl_add_consistency('not_equal', b, warnings_only=True)
a.impl_add_consistency('not_equal', c)
api = getapi(Config(od))
with warnings.catch_warnings(record=True) as w:
raises(ValueError, "api.option('a').value.set(1)")
assert w == []
#def test_consistency_network_netmask_mandatory():
# a = NetworkOption('a', '', multi=True, properties=('mandatory',), default=[u'0.0.0.0'])
# b = NetmaskOption('b', '', multi=True, properties=('mandatory',), default_multi=u'0.0.0.0')
# od = MasterSlaves('a', '', [a, b])
# b.impl_add_consistency('network_netmask', a)
# od2 = OptionDescription('od2', '', [od])
# api = getapi(Config(od2))
# api.property.read_only()
# api.property.pop('mandatory')
# api.option.make_dict()
#
#
def test_consistency_has_dependency():
a = IPOption('a', '')
b = NetmaskOption('b', '')
od = OptionDescription('od', '', [a, b])
b.impl_add_consistency('ip_netmask', a)
api = getapi(Config(od))
assert api.option('a').option.has_dependency() is True
assert api.option('b').option.has_dependency() is True
assert api.option('a').option.has_dependency(False) is True
assert api.option('b').option.has_dependency(False) is True
def test_consistency_not_equal_has_dependency():
a = IntOption('a', '')
b = IntOption('b', '')
od = OptionDescription('od', '', [a, b])
a.impl_add_consistency('not_equal', b)
api = getapi(Config(od))
assert api.option('a').option.has_dependency() is False
assert api.option('b').option.has_dependency() is False
assert api.option('a').option.has_dependency(False) is True
assert api.option('b').option.has_dependency(False) is True

View File

@ -354,7 +354,8 @@ class SubConfig(object):
name, name,
index, index,
config_bag, config_bag,
returns_option=False): returns_option=False,
iter_slave=False):
""" """
attribute notation mechanism for accessing the value of an option attribute notation mechanism for accessing the value of an option
:param name: attribute name :param name: attribute name
@ -398,11 +399,11 @@ class SubConfig(object):
subpath) subpath)
if option.impl_is_master_slaves('slave'): if option.impl_is_master_slaves('slave'):
if index is None: if index is None and not iter_slave:
raise IndexError(_('index is mandatory for the slave "{}"' raise IndexError(_('index is mandatory for the slave "{}"'
'').format(subpath)) '').format(subpath))
length = self.cfgimpl_get_length() length = self.cfgimpl_get_length()
if index >= length: if index is not None and index >= length:
raise IndexError(_('index ({}) is higher than the master length ({}) ' raise IndexError(_('index ({}) is higher than the master length ({}) '
'for "{}"').format(index, 'for "{}"').format(index,
length, length,
@ -417,13 +418,18 @@ class SubConfig(object):
elif index: elif index:
raise IndexError(_('index is forbidden for the not slave "{}"' raise IndexError(_('index is forbidden for the not slave "{}"'
'').format(subpath)) '').format(subpath))
#FIXME deja fiat dans get_cached_value if option.impl_is_master_slaves('slave') and index is None:
#if config_bag.validate: value = []
# option.impl_validate(context, length = self.cfgimpl_get_length()
# config_bag) for idx in range(length):
value = self.cfgimpl_get_values().get_cached_value(subpath, config_bag.properties = None
index, value.append(self.getattr(name,
config_bag) idx,
config_bag))
else:
value = self.cfgimpl_get_values().get_cached_value(subpath,
index,
config_bag)
if config_bag.validate_properties: if config_bag.validate_properties:
self.cfgimpl_get_settings().validate_mandatory(subpath, self.cfgimpl_get_settings().validate_mandatory(subpath,
index, index,

View File

@ -454,10 +454,9 @@ class BaseOption(Base):
elif name != '_readonly': elif name != '_readonly':
is_readonly = self.impl_is_readonly() is_readonly = self.impl_is_readonly()
if is_readonly: if is_readonly:
raise AttributeError(_("'{0}' ({1}) object attribute '{2}' is" raise AttributeError(_('"{}" ({}) object attribute "{}" is'
" read-only").format(self.__class__.__name__, ' read-only').format(self.__class__.__name__,
self, self.impl_get_display_name(),
#self.impl_getname(),
name)) name))
super(BaseOption, self).__setattr__(name, value) super(BaseOption, self).__setattr__(name, value)

View File

@ -51,7 +51,7 @@ class BroadcastOption(Option):
vals, vals,
warnings_only): warnings_only):
if len(vals) != 3: if len(vals) != 3:
return ConfigError(_('invalid len for vals')) raise ConfigError(_('invalid len for vals'))
if None in vals: if None in vals:
return return
broadcast, network, netmask = vals broadcast, network, netmask = vals

View File

@ -103,7 +103,7 @@ class IPOption(Option):
vals, vals,
warnings_only): warnings_only):
if len(vals) != 3: if len(vals) != 3:
return ConfigError(_('invalid len for vals')) raise ConfigError(_('invalid len for vals'))
if None in vals: if None in vals:
return return
ip, network, netmask = vals ip, network, netmask = vals

View File

@ -81,7 +81,7 @@ class NetmaskOption(Option):
make_net, make_net,
warnings_only): warnings_only):
if len(opts) != 2: if len(opts) != 2:
return ConfigError(_('invalid len for opts')) raise ConfigError(_('invalid len for opts'))
msg = None msg = None
try: try:
ip = IP('{0}/{1}'.format(val_ipnetwork, val_netmask), make_net=make_net) ip = IP('{0}/{1}'.format(val_ipnetwork, val_netmask), make_net=make_net)
@ -98,4 +98,4 @@ class NetmaskOption(Option):
msg = _('with netmask "{0}" ("{1}")') msg = _('with netmask "{0}" ("{1}")')
if msg is not None: if msg is not None:
raise ValueError(msg.format(val_netmask, raise ValueError(msg.format(val_netmask,
opts[1].impl_get_diplay_name())) opts[1].impl_get_display_name()))

View File

@ -20,7 +20,6 @@
# the whole pypy projet is under MIT licence # the whole pypy projet is under MIT licence
# ____________________________________________________________ # ____________________________________________________________
import warnings import warnings
import sys
import weakref import weakref
from .baseoption import OnlyOption, submulti, validate_calculator, STATIC_TUPLE from .baseoption import OnlyOption, submulti, validate_calculator, STATIC_TUPLE
@ -33,9 +32,6 @@ from ..error import (ConfigError, ValueWarning, PropertiesOptionError,
from itertools import combinations from itertools import combinations
ALLOWED_CONST_LIST = ['_cons_not_equal'] ALLOWED_CONST_LIST = ['_cons_not_equal']
if sys.version_info[0] >= 3: # pragma: no cover
xrange = range
class Option(OnlyOption): class Option(OnlyOption):
""" """
@ -177,6 +173,7 @@ class Option(OnlyOption):
def _launch_consistency(self, def _launch_consistency(self,
current_opt, current_opt,
func, func,
cons_id,
option, option,
value, value,
context, context,
@ -206,73 +203,96 @@ class Option(OnlyOption):
""" """
if context is not undefined: if context is not undefined:
descr = context.cfgimpl_get_description() descr = context.cfgimpl_get_description()
if config_bag is not undefined and config_bag.fromconsistency == option: if config_bag is not undefined and cons_id in config_bag.fromconsistency:
return return
all_cons_vals = [] all_cons_vals = []
all_cons_opts = [] all_cons_opts = []
length = None
for opt in opts: for opt in opts:
if isinstance(opt, weakref.ReferenceType): if isinstance(opt, weakref.ReferenceType):
opt = opt() opt = opt()
if config_bag is not undefined:
if config_bag.fromconsistency == opt:
break
sconfig_bag = config_bag.copy('nooption')
sconfig_bag.option = opt
sconfig_bag.fromconsistency = option
else:
sconfig_bag = undefined
if option == opt: if option == opt:
# option is current option # option is current option
# we have already value, so use it # we have already value, so use it
all_cons_vals.append(value) opt_value = value
all_cons_opts.append(opt) elif context is undefined:
opt_value = opt.impl_getdefault()
else: else:
path = None
#if context, calculate value, otherwise get default value #if context, calculate value, otherwise get default value
if context is not undefined: sconfig_bag = config_bag.copy('nooption')
if isinstance(opt, DynSymLinkOption): sconfig_bag.option = opt
path = opt.impl_getpath(context) sconfig_bag.fromconsistency.append(cons_id)
else: sconfig_bag.force_permissive = True
#FIXME ca devrait etre impl_getpath ??) path = opt.impl_getpath(context)
path = descr.impl_get_path_by_opt(opt) if opt.impl_is_master_slaves('slave'):
try: index_ = index
opt_value = context.getattr(path,
index,
sconfig_bag)
except PropertiesOptionError as err:
if debug: # pragma: no cover
log.debug('propertyerror in _launch_consistency: {0}'.format(err))
if transitive:
err.set_orig_opt(option)
raise err
else:
opt_value = None
#elif index is None:
else: else:
opt_value = opt.impl_getdefault() index_ = None
if index is not None: try:
if len(opt_value) <= index: opt_value = context.getattr(path,
opt_value = opt.impl_getdefault_multi() index_,
else: sconfig_bag,
opt_value = opt_value[index] iter_slave=True)
except PropertiesOptionError as err:
if debug: # pragma: no cover
log.debug('propertyerror in _launch_consistency: {0}'.format(err))
if transitive:
err.set_orig_opt(option)
raise err
opt_value = None
if not option == opt and opt_value is not None and index is not None and \
(context is undefined or \
not opt.impl_is_master_slaves('slave')):
if len(opt_value) <= index:
opt_value = opt.impl_getdefault_multi()
else:
opt_value = opt_value[index]
is_multi = self.impl_is_multi() if opt.impl_is_multi() and index is None and func not in ALLOWED_CONST_LIST:
if is_multi and index is None: if length is not None and length != len(opt_value):
# only check propertyerror for master/slaves is transitive raise ValueError(_('unexpected length of "{}" in constency "{}", should be "{}"'
break '').format(len(opt_value),
if is_multi and isinstance(opt_value, list): opt.impl_get_display_name(),
all_cons_vals.extend(opt_value) length))
for len_ in xrange(len(opt_value)):
all_cons_opts.append(opt)
else: else:
all_cons_vals.append(opt_value) length = len(opt_value)
all_cons_opts.append(opt) is_multi = True
else:
is_multi = False
if isinstance(opt_value, list) and func in ALLOWED_CONST_LIST:
for value_ in opt_value:
if isinstance(value_, list):
for val in value_:
all_cons_vals.append((False, val))
all_cons_opts.append(opt)
else:
all_cons_vals.append((False, value_))
all_cons_opts.append(opt)
else:
all_cons_vals.append((is_multi, opt_value))
all_cons_opts.append(opt)
else: else:
try: try:
getattr(self, func)(current_opt, all_values = []
all_cons_opts, if length is None:
all_cons_vals, all_value = []
warnings_only) for is_multi, values in all_cons_vals:
all_value.append(values)
all_values = [all_value]
else:
for idx in range(length):
all_value = []
for is_multi, values in all_cons_vals:
if not is_multi:
all_value.append(values)
else:
all_value.append(values[idx])
all_values.append(all_value)
for values in all_values:
getattr(self, func)(current_opt,
all_cons_opts,
values,
warnings_only)
except ValueError as err: except ValueError as err:
if warnings_only: if warnings_only:
msg = _('attention, "{0}" could be an invalid {1} for "{2}", {3}' msg = _('attention, "{0}" could be an invalid {1} for "{2}", {3}'
@ -322,7 +342,7 @@ class Option(OnlyOption):
if config_bag is not undefined and \ if config_bag is not undefined and \
((check_error is True and not 'validator' in config_bag.setting_properties) or \ ((check_error is True and not 'validator' in config_bag.setting_properties) or \
(check_error is False and not 'warnings' in config_bag.setting_properties)): (check_error is False and not 'warnings' in config_bag.setting_properties)):
return return
def _is_not_unique(value): def _is_not_unique(value):
@ -367,117 +387,103 @@ class Option(OnlyOption):
'which must not be a list').format(_value, 'which must not be a list').format(_value,
self.impl_get_display_name())) self.impl_get_display_name()))
#FIXME a revoir ... #FIXME a revoir ...
is_warnings_only = getattr(self, '_warnings_only', False) if _value is not None:
try:
if _value is not None:
if check_error:
# option validation
if config_bag is undefined:
setting_properties = None
else:
setting_properties = config_bag.setting_properties
self._validate(_value,
config_bag,
current_opt)
if ((check_error and not is_warnings_only) or
(not check_error and is_warnings_only)):
calculation_validator(_value,
_index)
self._second_level_validation(_value,
is_warnings_only)
self._valid_consistency(current_opt,
_value,
context,
_index,
check_error,
config_bag)
except ValueError as err:
if debug: # pragma: no cover
log.debug('do_validation: value: {0}, index: {1}:'
' {2}'.format(_value,
_index,
err),
exc_info=True)
if is_warnings_only:
msg = _('attention, "{0}" could be an invalid {1} for "{2}"'
'').format(_value,
self._display_name,
self.impl_get_display_name())
else:
msg = _('"{0}" is an invalid {1} for "{2}"'
'').format(_value,
self._display_name,
self.impl_get_display_name())
err_msg = '{0}'.format(err)
if err_msg:
msg += ', {}'.format(err_msg)
if check_error: if check_error:
raise ValueError(msg) # option validation
else: if config_bag is undefined:
warnings.warn_explicit(ValueWarning(msg, self), setting_properties = None
ValueWarning, else:
self.__class__.__name__, 0) setting_properties = config_bag.setting_properties
self._validate(_value,
config_bag,
current_opt)
if ((check_error and not is_warnings_only) or
(not check_error and is_warnings_only)):
calculation_validator(_value,
_index)
self._second_level_validation(_value,
is_warnings_only)
if is_multi is None: if is_multi is None:
is_multi = self.impl_is_multi() is_multi = self.impl_is_multi()
if not is_multi: is_warnings_only = getattr(self, '_warnings_only', False)
do_validation(value, None) try:
elif force_index is not None: val = value
if self.impl_is_submulti(): if not is_multi:
do_validation(val, None)
elif force_index is not None:
if self.impl_is_submulti():
_is_not_unique(value)
for idx, val in enumerate(value):
do_validation(val,
force_index)
else:
if multi is not None and self.impl_is_unique() and value in multi:
if not self.impl_is_submulti() and len(multi) - 1 >= force_index:
lst = list(multi)
lst.pop(force_index)
else:
lst = multi
if value in lst:
raise ValueError(_('invalid value "{}", this value is already'
' in "{}"').format(value,
self.impl_get_display_name()))
do_validation(val,
force_index)
elif not isinstance(value, list):
raise ValueError(_('invalid value "{0}" for "{1}" which '
'must be a list').format(value,
self.impl_getname()))
elif self.impl_is_submulti():
for idx, lval in enumerate(value):
_is_not_unique(lval)
if not isinstance(lval, list):
raise ValueError(_('invalid value "{0}" for "{1}" '
'which must be a list of list'
'').format(lval,
self.impl_getname()))
for val in lval:
do_validation(val,
idx)
else:
_is_not_unique(value) _is_not_unique(value)
for idx, val in enumerate(value):
do_validation(val,
force_index)
else:
if multi is not None and self.impl_is_unique() and value in multi:
if not self.impl_is_submulti() and len(multi) - 1 >= force_index:
lst = list(multi)
lst.pop(force_index)
else:
lst = multi
if value in lst:
raise ValueError(_('invalid value "{}", this value is already'
' in "{}"').format(value,
self.impl_get_display_name()))
do_validation(value,
force_index)
elif not isinstance(value, list):
raise ValueError(_('invalid value "{0}" for "{1}" which '
'must be a list').format(value,
self.impl_getname()))
elif self.impl_is_submulti():
if value:
for idx, val in enumerate(value):
_is_not_unique(val)
if not isinstance(val, list):
raise ValueError(_('invalid value "{0}" for "{1}" '
'which must be a list of list'
'').format(val,
self.impl_getname()))
for slave_val in val:
do_validation(slave_val,
idx)
else:
self._valid_consistency(current_opt,
None,
context,
None,
check_error,
config_bag)
else:
_is_not_unique(value)
if value:
for idx, val in enumerate(value): for idx, val in enumerate(value):
do_validation(val, do_validation(val,
idx) idx)
self._valid_consistency(current_opt,
value,
context,
force_index,
check_error,
config_bag)
except ValueError as err:
if debug: # pragma: no cover
log.debug('do_validation: value: {0}, index: {1}:'
' {2}'.format(val,
_index,
err),
exc_info=True)
if is_warnings_only:
msg = _('attention, "{0}" could be an invalid {1} for "{2}"'
'').format(val,
self._display_name,
self.impl_get_display_name())
else: else:
self._valid_consistency(current_opt, msg = _('"{0}" is an invalid {1} for "{2}"'
None, '').format(val,
context, self._display_name,
None, self.impl_get_display_name())
check_error, err_msg = '{0}'.format(err)
config_bag) if err_msg:
msg += ', {}'.format(err_msg)
if check_error:
raise ValueError(msg)
else:
warnings.warn_explicit(ValueWarning(msg, self),
ValueWarning,
self.__class__.__name__, 0)
def impl_is_dynsymlinkoption(self): def impl_is_dynsymlinkoption(self):
return False return False
@ -619,7 +625,7 @@ class Option(OnlyOption):
else: else:
consistencies = option._get_consistencies() consistencies = option._get_consistencies()
if consistencies is not None: if consistencies is not None:
for func, all_cons_opts, params in consistencies: for cons_id, func, all_cons_opts, params in consistencies:
warnings_only = params.get('warnings_only', False) warnings_only = params.get('warnings_only', False)
if (warnings_only and not check_error) or (not warnings_only and check_error): if (warnings_only and not check_error) or (not warnings_only and check_error):
transitive = params.get('transitive', True) transitive = params.get('transitive', True)
@ -636,6 +642,7 @@ class Option(OnlyOption):
wopt = opts[0]() wopt = opts[0]()
wopt._launch_consistency(self, wopt._launch_consistency(self,
func, func,
cons_id,
option, option,
value, value,
context, context,
@ -650,7 +657,7 @@ class Option(OnlyOption):
opts, opts,
vals, vals,
warnings_only): warnings_only):
equal = list() equal = []
is_current = False is_current = False
for idx_inf, val_inf in enumerate(vals): for idx_inf, val_inf in enumerate(vals):
for idx_sup, val_sup in enumerate(vals[idx_inf + 1:]): for idx_sup, val_sup in enumerate(vals[idx_inf + 1:]):
@ -659,7 +666,8 @@ class Option(OnlyOption):
if opt_ == current_opt: if opt_ == current_opt:
is_current = True is_current = True
else: else:
equal.append(opt_) if opt_ not in equal:
equal.append(opt_)
if equal: if equal:
if debug: # pragma: no cover if debug: # pragma: no cover
log.debug(_('_cons_not_equal: {} are not different').format(display_list(equal))) log.debug(_('_cons_not_equal: {} are not different').format(display_list(equal)))
@ -742,7 +750,7 @@ class Option(OnlyOption):
func, func,
all_cons_opts, all_cons_opts,
params): params):
cons = (func, all_cons_opts, params) cons = (None, func, all_cons_opts, params)
consistencies = getattr(self, '_consistencies', None) consistencies = getattr(self, '_consistencies', None)
if consistencies is None: if consistencies is None:
self._consistencies = [cons] self._consistencies = [cons]
@ -755,8 +763,14 @@ class Option(OnlyOption):
def _get_consistencies(self): def _get_consistencies(self):
return getattr(self, '_consistencies', STATIC_TUPLE) return getattr(self, '_consistencies', STATIC_TUPLE)
def _has_consistencies(self): def _has_consistencies(self, context):
return hasattr(self, '_consistencies') if context is undefined:
return False
descr = context.cfgimpl_get_description()
if descr._cache_consistencies is None:
return False
return self in descr._cache_consistencies
class RegexpOption(Option): class RegexpOption(Option):

View File

@ -36,6 +36,7 @@ class CacheOptionDescription(BaseOption):
config, config,
path='', path='',
_consistencies=None, _consistencies=None,
_consistencies_id=0,
cache_option=None, cache_option=None,
force_store_values=None, force_store_values=None,
_dependencies=None): _dependencies=None):
@ -66,6 +67,7 @@ class CacheOptionDescription(BaseOption):
option._build_cache(config, option._build_cache(config,
subpath, subpath,
_consistencies, _consistencies,
_consistencies_id,
cache_option, cache_option,
force_store_values, force_store_values,
_dependencies) _dependencies)
@ -74,7 +76,7 @@ class CacheOptionDescription(BaseOption):
is_multi = option.impl_is_multi() is_multi = option.impl_is_multi()
if not option.impl_is_symlinkoption() and 'force_store_value' in option.impl_getproperties(): if not option.impl_is_symlinkoption() and 'force_store_value' in option.impl_getproperties():
force_store_values.append((subpath, option)) force_store_values.append((subpath, option))
for func, all_cons_opts, params in option._get_consistencies(): for cons_id, func, all_cons_opts, params in option._get_consistencies():
option._valid_consistencies(all_cons_opts[1:], init=False) option._valid_consistencies(all_cons_opts[1:], init=False)
if func not in ALLOWED_CONST_LIST and is_multi: if func not in ALLOWED_CONST_LIST and is_multi:
is_masterslaves = option.impl_is_master_slaves() is_masterslaves = option.impl_is_master_slaves()
@ -95,9 +97,11 @@ class CacheOptionDescription(BaseOption):
'must be in same master/slaves for "{1}"').format( 'must be in same master/slaves for "{1}"').format(
option.impl_getname(), opt.impl_getname())) option.impl_getname(), opt.impl_getname()))
_consistencies.setdefault(weak_opt, _consistencies.setdefault(weak_opt,
[]).append((func, []).append((_consistencies_id,
func,
all_cons_opts, all_cons_opts,
params)) params))
_consistencies_id += 1
# if context is set to callback, must be reset each time a value change # if context is set to callback, must be reset each time a value change
if hasattr(option, '_has_calc_context'): if hasattr(option, '_has_calc_context'):
self._add_dependency(option) self._add_dependency(option)

View File

@ -81,8 +81,8 @@ class SymLinkOption(OnlyOption):
def _get_consistencies(self): def _get_consistencies(self):
return () return ()
def _has_consistencies(self): def _has_consistencies(self, context):
return False return option._opt._has_consistencies(context)
class DynSymLinkOption(object): class DynSymLinkOption(object):

View File

@ -142,6 +142,7 @@ class ConfigBag(object):
'trusted_cached_properties': True, 'trusted_cached_properties': True,
} }
self.config = config self.config = config
self.fromconsistency = []
for key, value in kwargs.items(): for key, value in kwargs.items():
if value != self.default.get(key): if value != self.default.get(key):
setattr(self, key, value) setattr(self, key, value)
@ -160,7 +161,9 @@ class ConfigBag(object):
if filters == 'nooption' and (key.startswith('option') or \ if filters == 'nooption' and (key.startswith('option') or \
key == 'properties'): key == 'properties'):
continue continue
if key != 'default': if key == 'fromconsistency':
kwargs['fromconsistency'] = copy(self.fromconsistency)
elif key != 'default':
value = getattr(self, key) value = getattr(self, key)
if value != self.default.get(key): if value != self.default.get(key):
kwargs[key] = value kwargs[key] = value

View File

@ -129,36 +129,6 @@ class Values(object):
# and return it # and return it
return value return value
def get_validated_value(self,
path,
index,
config_bag):
"""get value and validate it
index is None for slave value, if value returned is not a list, just return []
:param path: the path of the `Option`
:param index: index for a slave `Option`
:returns: value
"""
value = self.getvalue(path,
index,
config_bag)
context = self._getcontext()
opt = config_bag.option
opt.impl_validate(value,
context=context,
force_index=index,
check_error=True,
config_bag=config_bag)
if config_bag.display_warnings:
opt.impl_validate(value,
context=context,
force_index=index,
check_error=False,
config_bag=config_bag)
return value
def getvalue(self, def getvalue(self,
path, path,
index, index,
@ -360,7 +330,7 @@ class Values(object):
context = self._getcontext() context = self._getcontext()
owner = context.cfgimpl_get_settings().getowner() owner = context.cfgimpl_get_settings().getowner()
if 'validator' in config_bag.setting_properties and config_bag.validate: if 'validator' in config_bag.setting_properties and config_bag.validate:
if config_bag.option._has_consistencies(): if config_bag.option._has_consistencies(context):
# set value to a fake config when option has dependency # set value to a fake config when option has dependency
# validation will be complet in this case (consistency, ...) # validation will be complet in this case (consistency, ...)
tested_context = context._gen_fake_values() tested_context = context._gen_fake_values()
@ -371,9 +341,10 @@ class Values(object):
value, value,
sconfig_bag, sconfig_bag,
True) True)
sconfig_bag.validate = True
tested_context.getattr(path, tested_context.getattr(path,
index, index,
config_bag) sconfig_bag)
else: else:
self.setvalue_validation(path, self.setvalue_validation(path,
index, index,