# coding: utf-8 from .autopath import do_autopath do_autopath() from tiramisu import setting, value setting.expires_time = 1 value.expires_time = 1 from tiramisu.option import BoolOption, IPOption, IntOption, StrOption, OptionDescription from tiramisu.config import Config from tiramisu.error import ConfigError from tiramisu.setting import groups from time import sleep, time from py.test import raises global incr incr = 0 def return_incr(): global incr incr += 1 return incr def return_value(val): return val def make_description(): u1 = IntOption('u1', '', multi=True) u2 = IntOption('u2', '') u3 = IntOption('u3', '', multi=True) return OptionDescription('od1', '', [u1, u2, u3]) def test_cache_config(): od1 = make_description() assert od1.impl_already_build_caches() is False c = Config(od1) assert od1.impl_already_build_caches() is True c def test_cache(): od1 = make_description() c = Config(od1) values = c.cfgimpl_get_values() settings = c.cfgimpl_get_settings() c.u1 assert 'u1' in values._p_.get_cached(c) assert 'u1' in settings._p_.get_cached(c) c.u2 assert 'u1' in values._p_.get_cached(c) assert 'u1' in settings._p_.get_cached(c) assert 'u2' in values._p_.get_cached(c) assert 'u2' in settings._p_.get_cached(c) def test_get_cache(): # force a value in cache, try if reget corrupted value od1 = make_description() c = Config(od1) values = c.cfgimpl_get_values() settings = c.cfgimpl_get_settings() ntime = time() + 1 settings._p_.setcache('u1', set(['inject']), ntime, None) assert 'inject' in settings[od1.u1] values._p_.setcache('u1', 100, ntime, None) assert c.u1 == [100] def test_get_cache_no_expire(): # force a value in cache, try if reget corrupted value od1 = make_description() c = Config(od1) values = c.cfgimpl_get_values() settings = c.cfgimpl_get_settings() settings._p_.setcache('u1', set(['inject2']), None, None) assert 'inject2' in settings[od1.u1] values._p_.setcache('u1', 200, None, None) assert c.u1 == [200] def test_cache_reset(): od1 = make_description() c = Config(od1) values = c.cfgimpl_get_values() settings = c.cfgimpl_get_settings() #when change a value c.u1 c.u2 assert 'u1' in values._p_.get_cached(c) assert 'u1' in settings._p_.get_cached(c) assert 'u2' in values._p_.get_cached(c) assert 'u2' in settings._p_.get_cached(c) c.u2 = 1 assert 'u1' in values._p_.get_cached(c) assert 'u1' in settings._p_.get_cached(c) assert 'u2' not in values._p_.get_cached(c) assert 'u2' in settings._p_.get_cached(c) #when remove a value c.u1 assert 'u1' in values._p_.get_cached(c) assert 'u1' in settings._p_.get_cached(c) del(c.u2) assert 'u1' in values._p_.get_cached(c) assert 'u1' in settings._p_.get_cached(c) assert 'u2' not in values._p_.get_cached(c) assert 'u2' in settings._p_.get_cached(c) #when add/del property c.u1 assert 'u1' in values._p_.get_cached(c) assert 'u1' in settings._p_.get_cached(c) c.cfgimpl_get_settings()[od1.u2].append('test') assert 'u1' in values._p_.get_cached(c) assert 'u1' in settings._p_.get_cached(c) assert 'u2' not in values._p_.get_cached(c) assert 'u2' not in settings._p_.get_cached(c) c.u1 assert 'u1' in values._p_.get_cached(c) assert 'u1' in settings._p_.get_cached(c) c.cfgimpl_get_settings()[od1.u2].remove('test') assert 'u1' in values._p_.get_cached(c) assert 'u1' in settings._p_.get_cached(c) assert 'u2' not in values._p_.get_cached(c) assert 'u2' not in settings._p_.get_cached(c) #when enable/disabled property c.u1 assert 'u1' in values._p_.get_cached(c) assert 'u1' in settings._p_.get_cached(c) c.cfgimpl_get_settings().append('test') assert 'u1' not in values._p_.get_cached(c) assert 'u1' not in settings._p_.get_cached(c) c.u1 assert 'u1' in values._p_.get_cached(c) assert 'u1' in settings._p_.get_cached(c) c.cfgimpl_get_settings().remove('test') assert 'u1' not in values._p_.get_cached(c) assert 'u1' not in settings._p_.get_cached(c) def test_cache_reset_multi(): od1 = make_description() c = Config(od1) values = c.cfgimpl_get_values() settings = c.cfgimpl_get_settings() c.u1 c.u3 assert 'u1' in values._p_.get_cached(c) assert 'u1' in settings._p_.get_cached(c) assert 'u3' in values._p_.get_cached(c) assert 'u3' in settings._p_.get_cached(c) #when change a value c.u3 = [1] assert 'u1' in values._p_.get_cached(c) assert 'u1' in settings._p_.get_cached(c) assert 'u3' not in values._p_.get_cached(c) assert 'u3' in settings._p_.get_cached(c) #when append value c.u1 c.u3 assert 'u1' in values._p_.get_cached(c) assert 'u1' in settings._p_.get_cached(c) assert 'u3' in values._p_.get_cached(c) assert 'u3' in settings._p_.get_cached(c) c.u3.append(1) assert 'u1' in values._p_.get_cached(c) assert 'u1' in settings._p_.get_cached(c) assert 'u3' not in values._p_.get_cached(c) assert 'u3' in settings._p_.get_cached(c) #when pop value c.u1 c.u3 assert 'u1' in values._p_.get_cached(c) assert 'u1' in settings._p_.get_cached(c) assert 'u3' in values._p_.get_cached(c) assert 'u3' in settings._p_.get_cached(c) c.u3.pop(1) assert 'u1' in values._p_.get_cached(c) assert 'u1' in settings._p_.get_cached(c) assert 'u3' not in values._p_.get_cached(c) assert 'u3' in settings._p_.get_cached(c) #when remove a value c.u1 assert 'u1' in values._p_.get_cached(c) assert 'u1' in settings._p_.get_cached(c) del(c.u3) assert 'u1' in values._p_.get_cached(c) assert 'u1' in settings._p_.get_cached(c) assert 'u3' not in values._p_.get_cached(c) assert 'u3' in settings._p_.get_cached(c) def test_reset_cache(): od1 = make_description() c = Config(od1) values = c.cfgimpl_get_values() settings = c.cfgimpl_get_settings() c.u1 assert 'u1' in values._p_.get_cached(c) assert 'u1' in settings._p_.get_cached(c) c.cfgimpl_reset_cache() assert 'u1' not in values._p_.get_cached(c) assert 'u1' not in settings._p_.get_cached(c) c.u1 sleep(1) c.u1 sleep(1) c.u2 assert 'u1' in values._p_.get_cached(c) assert 'u1' in settings._p_.get_cached(c) assert 'u2' in values._p_.get_cached(c) assert 'u2' in settings._p_.get_cached(c) c.cfgimpl_reset_cache() assert 'u1' not in values._p_.get_cached(c) assert 'u1' not in settings._p_.get_cached(c) assert 'u2' not in values._p_.get_cached(c) assert 'u2' not in settings._p_.get_cached(c) def test_reset_cache_subconfig(): od1 = make_description() od2 = OptionDescription('od2', '', [od1]) c = Config(od2) values = c.cfgimpl_get_values() c.od1.u1 assert 'od1.u1' in values._p_.get_cached(c) c.od1.cfgimpl_reset_cache() assert 'od1.u1' not in values._p_.get_cached(c) def test_reset_cache_only_expired(): od1 = make_description() c = Config(od1) values = c.cfgimpl_get_values() settings = c.cfgimpl_get_settings() c.u1 assert 'u1' in values._p_.get_cached(c) assert 'u1' in settings._p_.get_cached(c) c.cfgimpl_reset_cache(True) assert 'u1' in values._p_.get_cached(c) assert 'u1' in settings._p_.get_cached(c) sleep(1) c.u1 sleep(1) c.u2 assert 'u1' in values._p_.get_cached(c) assert 'u1' in settings._p_.get_cached(c) assert 'u2' in values._p_.get_cached(c) assert 'u2' in settings._p_.get_cached(c) c.cfgimpl_reset_cache(True) assert 'u1' not in values._p_.get_cached(c) assert 'u1' not in settings._p_.get_cached(c) assert 'u2' in values._p_.get_cached(c) assert 'u2' in settings._p_.get_cached(c) def test_cache_not_expire(): od1 = make_description() c = Config(od1) values = c.cfgimpl_get_values() settings = c.cfgimpl_get_settings() settings.remove('expire') c.u1 assert 'u1' in values._p_.get_cached(c) assert 'u1' in settings._p_.get_cached(c) c.cfgimpl_reset_cache(True) assert 'u1' in values._p_.get_cached(c) assert 'u1' in settings._p_.get_cached(c) sleep(1) c.u2 assert 'u1' in values._p_.get_cached(c) assert 'u1' in settings._p_.get_cached(c) assert 'u2' in values._p_.get_cached(c) assert 'u2' in settings._p_.get_cached(c) c.cfgimpl_reset_cache(True) assert 'u1' in values._p_.get_cached(c) assert 'u1' in settings._p_.get_cached(c) assert 'u2' in values._p_.get_cached(c) assert 'u2' in settings._p_.get_cached(c) def test_cache_not_cache(): od1 = make_description() c = Config(od1) values = c.cfgimpl_get_values() settings = c.cfgimpl_get_settings() settings.remove('cache') c.u1 assert 'u1' not in values._p_.get_cached(c) assert 'u1' not in settings._p_.get_cached(c) def test_reset_cache_only(): od1 = make_description() c = Config(od1) values = c.cfgimpl_get_values() settings = c.cfgimpl_get_settings() c.u1 assert 'u1' in values._p_.get_cached(c) assert 'u1' in settings._p_.get_cached(c) c.cfgimpl_reset_cache(only=('values',)) assert 'u1' not in values._p_.get_cached(c) assert 'u1' in settings._p_.get_cached(c) c.u1 assert 'u1' in values._p_.get_cached(c) assert 'u1' in settings._p_.get_cached(c) c.cfgimpl_reset_cache(only=('settings',)) assert 'u1' in values._p_.get_cached(c) assert 'u1' not in settings._p_.get_cached(c) def test_force_cache(): u1 = IntOption('u1', '', multi=True) u2 = IntOption('u2', '') u3 = IntOption('u3', '', multi=True) u4 = IntOption('u4', '', properties=('disabled',)) od = OptionDescription('od1', '', [u1, u2, u3, u4]) c = Config(od) c.cfgimpl_get_settings().remove('expire') c.cfgimpl_get_values().force_cache() assert c.cfgimpl_get_values()._p_.get_cached(c) == {'u1': {None: ([], None)}, 'u2': {None: (None, None)}, 'u3': {None: ([], None)}, 'u4': {None: (None, None)}} assert c.cfgimpl_get_settings()._p_.get_cached(c) == {None: {None: (set(['cache', 'validator', 'warnings']), None)}, 'u1': {None: (set(['empty']), None)}, 'u2': {None: (set([]), None)}, 'u3': {None: (set(['empty']), None)}, 'u4': {None: (set(['disabled']), None)}} c.read_only() c.cfgimpl_get_values().force_cache() assert c.cfgimpl_get_values()._p_.get_cached(c) == {'u1': {None: ([], None)}, 'u2': {None: (None, None)}, 'u3': {None: ([], None)}} assert c.cfgimpl_get_settings()._p_.get_cached(c) == {None: {None: (set(['cache', 'disabled', 'empty', 'everything_frozen', 'frozen', 'mandatory', 'validator', 'warnings']), None)}, 'u1': {None: (set(['empty']), None)}, 'u2': {None: (set([]), None)}, 'u3': {None: (set(['empty']), None)}, 'u4': {None: (set(['disabled']), None)}} c.cfgimpl_get_settings().remove('cache') raises(ConfigError, "c.cfgimpl_get_values().force_cache()") def test_cache_master_slave(): ip_admin_eth0 = StrOption('ip_admin_eth0', "ip réseau autorisé", multi=True) netmask_admin_eth0 = StrOption('netmask_admin_eth0', "masque du sous-réseau", multi=True) interface1 = OptionDescription('ip_admin_eth0', '', [ip_admin_eth0, netmask_admin_eth0]) interface1.impl_set_group_type(groups.master) maconfig = OptionDescription('toto', '', [interface1]) cfg = Config(maconfig) cfg.read_write() assert cfg.cfgimpl_get_values()._p_.get_cached(cfg) == {} assert cfg.cfgimpl_get_settings()._p_.get_cached(cfg) == {} # cfg.ip_admin_eth0.ip_admin_eth0.append('192.168.1.2') cfg.ip_admin_eth0.ip_admin_eth0 cfg.ip_admin_eth0.netmask_admin_eth0 cache = cfg.cfgimpl_get_values()._p_.get_cached(cfg) assert set(cache.keys()) == set(['ip_admin_eth0.ip_admin_eth0', 'ip_admin_eth0.netmask_admin_eth0']) assert set(cache['ip_admin_eth0.ip_admin_eth0'].keys()) == set([None]) assert cache['ip_admin_eth0.ip_admin_eth0'][None][0] == ['192.168.1.2'] assert set(cache['ip_admin_eth0.netmask_admin_eth0'].keys()) == set([None, 0]) assert cache['ip_admin_eth0.netmask_admin_eth0'][None][0] == [None] assert cache['ip_admin_eth0.netmask_admin_eth0'][0][0] is None cache = cfg.cfgimpl_get_settings()._p_.get_cached(cfg) assert set(cache.keys()) == set([None, 'ip_admin_eth0', 'ip_admin_eth0.ip_admin_eth0', 'ip_admin_eth0.netmask_admin_eth0']) assert set(cache['ip_admin_eth0'].keys()) == set([None]) assert set(cache['ip_admin_eth0.ip_admin_eth0'].keys()) == set([None]) assert set(cache['ip_admin_eth0.netmask_admin_eth0'].keys()) == set([None, 0]) # cfg.ip_admin_eth0.ip_admin_eth0.append('192.168.1.1') cfg.ip_admin_eth0.ip_admin_eth0 cfg.ip_admin_eth0.netmask_admin_eth0 cache = cfg.cfgimpl_get_values()._p_.get_cached(cfg) assert set(cache.keys()) == set(['ip_admin_eth0.ip_admin_eth0', 'ip_admin_eth0.netmask_admin_eth0']) assert set(cache['ip_admin_eth0.ip_admin_eth0'].keys()) == set([None]) assert cache['ip_admin_eth0.ip_admin_eth0'][None][0] == ['192.168.1.2', '192.168.1.1'] assert set(cache['ip_admin_eth0.netmask_admin_eth0'].keys()) == set([None, 0, 1]) assert cache['ip_admin_eth0.netmask_admin_eth0'][None][0] == [None, None] assert cache['ip_admin_eth0.netmask_admin_eth0'][0][0] is None assert cache['ip_admin_eth0.netmask_admin_eth0'][1][0] is None cache = cfg.cfgimpl_get_settings()._p_.get_cached(cfg) assert set(cache.keys()) == set([None, 'ip_admin_eth0', 'ip_admin_eth0.ip_admin_eth0', 'ip_admin_eth0.netmask_admin_eth0']) assert set(cache['ip_admin_eth0'].keys()) == set([None]) assert set(cache['ip_admin_eth0.ip_admin_eth0'].keys()) == set([None]) assert set(cache['ip_admin_eth0.netmask_admin_eth0'].keys()) == set([None, 0, 1]) #DEL, insert, ... def return_value(value=None): return value def test_cache_callback(): val1 = StrOption('val1', "", 'val') val2 = StrOption('val2', "", callback=return_value, callback_params={'': ((val1, False),)}, properties=('mandatory',)) val3 = StrOption('val3', "", callback=return_value, callback_params={'': ('yes',)}) val4 = StrOption('val4', "", callback=return_value, callback_params={'value': ((val1, False),)}) val5 = StrOption('val5', "", callback=return_value, callback_params={'value': ('yes',)}, multi=True) maconfig = OptionDescription('rootconfig', '', [val1, val2, val3, val4, val5]) cfg = Config(maconfig) cfg.cfgimpl_get_settings().remove('expire') cfg.read_write() cfg.cfgimpl_get_values().force_cache() assert cfg.cfgimpl_get_settings()._p_.get_cached(cfg) == {None: {None: (set(['cache', 'disabled', 'frozen', 'hidden', 'validator', 'warnings']), None)}, 'val1': {None: (set([]), None)}, 'val2': {None: (set(['mandatory']), None)}, 'val3': {None: (set([]), None)}, 'val4': {None: (set([]), None)}, 'val5': {None: (set(['empty']), None)}} assert cfg.cfgimpl_get_values()._p_.get_cached(cfg) == {'val1': {None: ('val', None)}, 'val2': {None: ('val', None)}, 'val3': {None: ('yes', None)}, 'val4': {None: ('val', None)}, 'val5': {None: (['yes'], None)}} cfg.val1 = 'new' assert cfg.cfgimpl_get_settings()._p_.get_cached(cfg) == {None: {None: (set(['cache', 'disabled', 'frozen', 'hidden', 'validator', 'warnings']), None)}, 'val1': {None: (set([]), None)}, 'val3': {None: (set([]), None)}, 'val5': {None: (set(['empty']), None)}} assert cfg.cfgimpl_get_values()._p_.get_cached(cfg) == {'val3': {None: ('yes', None)}, 'val5': {None: (['yes'], None)}} cfg.cfgimpl_get_values().force_cache() assert cfg.cfgimpl_get_settings()._p_.get_cached(cfg) == {None: {None: (set(['cache', 'disabled', 'frozen', 'hidden', 'validator', 'warnings']), None)}, 'val1': {None: (set([]), None)}, 'val2': {None: (set(['mandatory']), None)}, 'val3': {None: (set([]), None)}, 'val4': {None: (set([]), None)}, 'val5': {None: (set(['empty']), None)}} assert cfg.cfgimpl_get_values()._p_.get_cached(cfg) == {'val1': {None: ('new', None)}, 'val2': {None: ('new', None)}, 'val3': {None: ('yes', None)}, 'val4': {None: ('new', None)}, 'val5': {None: (['yes'], None)}} cfg.val3 = 'new2' assert cfg.cfgimpl_get_settings()._p_.get_cached(cfg) == {None: {None: (set(['cache', 'disabled', 'frozen', 'hidden', 'validator', 'warnings']), None)}, 'val1': {None: (set([]), None)}, 'val2': {None: (set(['mandatory']), None)}, 'val3': {None: (set([]), None)}, 'val4': {None: (set([]), None)}, 'val5': {None: (set(['empty']), None)}} assert cfg.cfgimpl_get_values()._p_.get_cached(cfg) == {'val1': {None: ('new', None)}, 'val2': {None: ('new', None)}, 'val4': {None: ('new', None)}, 'val5': {None: (['yes'], None)}} cfg.cfgimpl_get_values().force_cache() assert cfg.cfgimpl_get_settings()._p_.get_cached(cfg) == {None: {None: (set(['cache', 'disabled', 'frozen', 'hidden', 'validator', 'warnings']), None)}, 'val1': {None: (set([]), None)}, 'val2': {None: (set(['mandatory']), None)}, 'val3': {None: (set([]), None)}, 'val4': {None: (set([]), None)}, 'val5': {None: (set(['empty']), None)}} assert cfg.cfgimpl_get_values()._p_.get_cached(cfg) == {'val1': {None: ('new', None)}, 'val2': {None: ('new', None)}, 'val3': {None: ('new2', None)}, 'val4': {None: ('new', None)}, 'val5': {None: (['yes'], None)}} cfg.val4 = 'new3' assert cfg.cfgimpl_get_settings()._p_.get_cached(cfg) == {None: {None: (set(['cache', 'disabled', 'frozen', 'hidden', 'validator', 'warnings']), None)}, 'val1': {None: (set([]), None)}, 'val2': {None: (set(['mandatory']), None)}, 'val3': {None: (set([]), None)}, 'val4': {None: (set([]), None)}, 'val5': {None: (set(['empty']), None)}} assert cfg.cfgimpl_get_values()._p_.get_cached(cfg) == {'val1': {None: ('new', None)}, 'val2': {None: ('new', None)}, 'val3': {None: ('new2', None)}, 'val5': {None: (['yes'], None)}} cfg.cfgimpl_get_values().force_cache() assert cfg.cfgimpl_get_settings()._p_.get_cached(cfg) == {None: {None: (set(['cache', 'disabled', 'frozen', 'hidden', 'validator', 'warnings']), None)}, 'val1': {None: (set([]), None)}, 'val2': {None: (set(['mandatory']), None)}, 'val3': {None: (set([]), None)}, 'val4': {None: (set([]), None)}, 'val5': {None: (set(['empty']), None)}} assert cfg.cfgimpl_get_values()._p_.get_cached(cfg) == {'val1': {None: ('new', None)}, 'val2': {None: ('new', None)}, 'val3': {None: ('new2', None)}, 'val4': {None: ('new3', None)}, 'val5': {None: (['yes'], None)}} cfg.val5.append('new4') assert cfg.cfgimpl_get_settings()._p_.get_cached(cfg) == {None: {None: (set(['cache', 'disabled', 'frozen', 'hidden', 'validator', 'warnings']), None)}, 'val1': {None: (set([]), None)}, 'val2': {None: (set(['mandatory']), None)}, 'val3': {None: (set([]), None)}, 'val4': {None: (set([]), None)}, 'val5': {None: (set(['empty']), None)}} assert cfg.cfgimpl_get_values()._p_.get_cached(cfg) == {'val1': {None: ('new', None)}, 'val2': {None: ('new', None)}, 'val3': {None: ('new2', None)}, 'val4': {None: ('new3', None)}} cfg.cfgimpl_get_values().force_cache() assert cfg.cfgimpl_get_settings()._p_.get_cached(cfg) == {None: {None: (set(['cache', 'disabled', 'frozen', 'hidden', 'validator', 'warnings']), None)}, 'val1': {None: (set([]), None)}, 'val2': {None: (set(['mandatory']), None)}, 'val3': {None: (set([]), None)}, 'val4': {None: (set([]), None)}, 'val5': {None: (set(['empty']), None)}} assert cfg.cfgimpl_get_values()._p_.get_cached(cfg) == {'val1': {None: ('new', None)}, 'val2': {None: ('new', None)}, 'val3': {None: ('new2', None)}, 'val4': {None: ('new3', None)}, 'val5': {None: (['yes', 'new4'], None)}} def test_cache_master_and_slaves_master(): val1 = StrOption('val1', "", multi=True) val2 = StrOption('val2', "", multi=True) interface1 = OptionDescription('val1', '', [val1, val2]) interface1.impl_set_group_type(groups.master) maconfig = OptionDescription('rootconfig', '', [interface1]) cfg = Config(maconfig) cfg.cfgimpl_get_settings().remove('expire') cfg.read_write() cfg.cfgimpl_get_values().force_cache() assert cfg.cfgimpl_get_settings()._p_.get_cached(cfg) == {None: {None: (set(['cache', 'disabled', 'frozen', 'hidden', 'validator', 'warnings']), None)}, 'val1': {None: (set([]), None)}, 'val1.val1': {None: (set(['empty']), None)}, 'val1.val2': {None: (set([]), None)}} assert cfg.cfgimpl_get_values()._p_.get_cached(cfg) == {'val1.val1': {None: ([], None)}, 'val1.val2': {None: ([], None)}} cfg.val1.val1.append() assert cfg.cfgimpl_get_settings()._p_.get_cached(cfg) == {None: {None: (set(['cache', 'disabled', 'frozen', 'hidden', 'validator', 'warnings']), None)}, 'val1': {None: (set([]), None)}} assert cfg.cfgimpl_get_values()._p_.get_cached(cfg) == {} cfg.cfgimpl_get_values().force_cache() assert cfg.cfgimpl_get_settings()._p_.get_cached(cfg) == {None: {None: (set(['cache', 'disabled', 'frozen', 'hidden', 'validator', 'warnings']), None)}, 'val1': {None: (set([]), None)}, 'val1.val1': {None: (set(['empty']), None)}, 'val1.val2': {None: (set([]), None), 0: (set([]), None)}} assert cfg.cfgimpl_get_values()._p_.get_cached(cfg) == {'val1.val1': {None: ([None], None)}, 'val1.val2': {None: ([None], None), 0: (None, None)}} cfg.val1.val1.append() cfg.cfgimpl_get_values().force_cache() cfg.val1.val2[1] = 'oui' assert cfg.cfgimpl_get_settings()._p_.get_cached(cfg) == {None: {None: (set(['cache', 'disabled', 'frozen', 'hidden', 'validator', 'warnings']), None)}, 'val1': {None: (set([]), None)}} assert cfg.cfgimpl_get_values()._p_.get_cached(cfg) == {} cfg.cfgimpl_get_values().force_cache() assert cfg.cfgimpl_get_settings()._p_.get_cached(cfg) == {None: {None: (set(['cache', 'disabled', 'frozen', 'hidden', 'validator', 'warnings']), None)}, 'val1': {None: (set([]), None)}, 'val1.val1': {None: (set(['empty']), None)}, 'val1.val2': {None: (set([]), None), 0: (set([]), None), 1: (set([]), None)}} assert cfg.cfgimpl_get_values()._p_.get_cached(cfg) == {'val1.val1': {None: ([None, None], None)}, 'val1.val2': {None: ([None, 'oui'], None), 0: (None, None), 1: ('oui', None)}} def test_cache_master_callback(): val1 = StrOption('val1', "", multi=True) val2 = StrOption('val2', "", multi=True, callback=return_value, callback_params={'value': ((val1, False),)}) interface1 = OptionDescription('val1', '', [val1, val2]) interface1.impl_set_group_type(groups.master) maconfig = OptionDescription('rootconfig', '', [interface1]) cfg = Config(maconfig) cfg.cfgimpl_get_settings().remove('expire') cfg.read_write() cfg.cfgimpl_get_values().force_cache() assert cfg.cfgimpl_get_settings()._p_.get_cached(cfg) == {None: {None: (set(['cache', 'disabled', 'frozen', 'hidden', 'validator', 'warnings']), None)}, 'val1': {None: (set([]), None)}, 'val1.val1': {None: (set(['empty']), None)}, 'val1.val2': {None: (set([]), None)}} assert cfg.cfgimpl_get_values()._p_.get_cached(cfg) == {'val1.val1': {None: ([], None)}, 'val1.val2': {None: ([], None)}} cfg.val1.val1.append() assert cfg.cfgimpl_get_settings()._p_.get_cached(cfg) == {None: {None: (set(['cache', 'disabled', 'frozen', 'hidden', 'validator', 'warnings']), None)}, 'val1': {None: (set([]), None)}} assert cfg.cfgimpl_get_values()._p_.get_cached(cfg) == {} cfg.cfgimpl_get_values().force_cache() assert cfg.cfgimpl_get_settings()._p_.get_cached(cfg) == {None: {None: (set(['cache', 'disabled', 'frozen', 'hidden', 'validator', 'warnings']), None)}, 'val1': {None: (set([]), None)}, 'val1.val1': {None: (set(['empty']), None)}, 'val1.val2': {None: (set([]), None), 0: (set([]), None)}} #assert cfg.cfgimpl_get_values()._p_.get_cached(cfg) == {'val1.val1': {None: ([None], None)}, # 'val1.val2': {None: ([None], None), 0: (None, None)}} def test_cache_requires(): a = BoolOption('activate_service', '', True) b = IPOption('ip_address_service', '', requires=[{'option': a, 'expected': False, 'action': 'disabled'}]) od = OptionDescription('service', '', [a, b]) c = Config(od) c.cfgimpl_get_settings().remove('expire') c.read_write() assert c.cfgimpl_get_settings()._p_.get_cached(c) == {} assert c.cfgimpl_get_values()._p_.get_cached(c) == {} assert c.ip_address_service == None assert c.cfgimpl_get_settings()._p_.get_cached(c) == {None: {None: (set(['cache', 'disabled', 'frozen', 'hidden', 'validator', 'warnings']), None)}, 'activate_service': {None: (set([]), None)}, 'ip_address_service': {None: (set([]), None)}} assert c.cfgimpl_get_values()._p_.get_cached(c) == {'ip_address_service': {None: (None, None)}} c.cfgimpl_get_values().force_cache() assert c.cfgimpl_get_settings()._p_.get_cached(c) == {None: {None: (set(['cache', 'disabled', 'frozen', 'hidden', 'validator', 'warnings']), None)}, 'activate_service': {None: (set([]), None)}, 'ip_address_service': {None: (set([]), None)}} assert c.cfgimpl_get_values()._p_.get_cached(c) == {'ip_address_service': {None: (None, None)}, 'activate_service': {None: (True, None)}} c.ip_address_service = '1.1.1.1' assert c.cfgimpl_get_settings()._p_.get_cached(c) == {None: {None: (set(['cache', 'disabled', 'frozen', 'hidden', 'validator', 'warnings']), None)}, 'activate_service': {None: (set([]), None)}, 'ip_address_service': {None: (set([]), None)}} assert c.cfgimpl_get_values()._p_.get_cached(c) == {'activate_service': {None: (True, None)}} c.cfgimpl_get_values().force_cache() assert c.cfgimpl_get_settings()._p_.get_cached(c) == {None: {None: (set(['cache', 'disabled', 'frozen', 'hidden', 'validator', 'warnings']), None)}, 'activate_service': {None: (set([]), None)}, 'ip_address_service': {None: (set([]), None)}} assert c.cfgimpl_get_values()._p_.get_cached(c) == {'ip_address_service': {None: ('1.1.1.1', None)}, 'activate_service': {None: (True, None)}} c.activate_service = False assert c.cfgimpl_get_settings()._p_.get_cached(c) == {None: {None: (set(['cache', 'disabled', 'frozen', 'hidden', 'validator', 'warnings']), None)}, 'activate_service': {None: (set([]), None)}} assert c.cfgimpl_get_values()._p_.get_cached(c) == {} c.cfgimpl_get_values().force_cache() assert c.cfgimpl_get_settings()._p_.get_cached(c) == {None: {None: (set(['cache', 'disabled', 'frozen', 'hidden', 'validator', 'warnings']), None)}, 'activate_service': {None: (set([]), None)}, 'ip_address_service': {None: (set(['disabled']), None)}} assert c.cfgimpl_get_values()._p_.get_cached(c) == {'activate_service': {None: (False, None)}} def test_cache_global_properties(): a = BoolOption('activate_service', '', True) b = IPOption('ip_address_service', '', requires=[{'option': a, 'expected': False, 'action': 'disabled'}]) od = OptionDescription('service', '', [a, b]) c = Config(od) c.cfgimpl_get_settings().remove('expire') c.read_write() assert c.cfgimpl_get_settings()._p_.get_cached(c) == {} assert c.cfgimpl_get_values()._p_.get_cached(c) == {} assert c.ip_address_service == None assert c.cfgimpl_get_settings()._p_.get_cached(c) == {None: {None: (set(['cache', 'disabled', 'frozen', 'hidden', 'validator', 'warnings']), None)}, 'activate_service': {None: (set([]), None)}, 'ip_address_service': {None: (set([]), None)}} assert c.cfgimpl_get_values()._p_.get_cached(c) == {'ip_address_service': {None: (None, None)}} c.cfgimpl_get_settings().remove('disabled') assert c.ip_address_service == None assert c.cfgimpl_get_settings()._p_.get_cached(c) == {None: {None: (set(['cache', 'frozen', 'hidden', 'validator', 'warnings']), None)}, 'activate_service': {None: (set([]), None)}, 'ip_address_service': {None: (set([]), None)}} c.cfgimpl_get_settings().append('test') assert c.ip_address_service == None assert c.cfgimpl_get_settings()._p_.get_cached(c) == {None: {None: (set(['cache', 'frozen', 'hidden', 'validator', 'warnings', 'test']), None)}, 'activate_service': {None: (set([]), None)}, 'ip_address_service': {None: (set([]), None)}} def test_callback_value_incr(): val1 = IntOption('val1', "", callback=return_incr) val2 = IntOption('val2', "", callback=return_value, callback_params={'value': ((val1, False),)}) maconfig = OptionDescription('rootconfig', '', [val1, val2]) cfg = Config(maconfig) cfg.read_write() assert cfg.val1 == 1 sleep(1) assert cfg.val2 == 1 sleep(1) assert cfg.val1 == 2 assert cfg.val2 == 2