tiramisu/test/test_cache.py

289 lines
9.2 KiB
Python

# coding: utf-8
import autopath
from tiramisu import setting
setting.expires_time = 1
from tiramisu.option import IntOption, OptionDescription
from tiramisu.config import Config
from tiramisu.error import ConfigError
from time import sleep, time
from py.test import raises
def make_description():
u1 = IntOption('u1', '', multi=True)
u2 = IntOption('u2', '')
u3 = IntOption('u3', '', multi=True)
return OptionDescription('od1', '', [u1, u2, u3])
def test_cache_config():
od1 = make_description()
assert od1.impl_already_build_caches() == False
c = Config(od1)
assert od1.impl_already_build_caches() == True
def test_cache():
od1 = make_description()
c = Config(od1)
values = c.cfgimpl_get_values()
settings = c.cfgimpl_get_settings()
c.u1
assert 'u1' in values._p_.get_cached(c)
assert 'u1' in settings._p_.get_cached(c)
c.u2
assert 'u1' in values._p_.get_cached(c)
assert 'u1' in settings._p_.get_cached(c)
assert 'u2' in values._p_.get_cached(c)
assert 'u2' in settings._p_.get_cached(c)
def test_get_cache():
# force a value in cache, try if reget corrupted value
od1 = make_description()
c = Config(od1)
values = c.cfgimpl_get_values()
settings = c.cfgimpl_get_settings()
ntime = time() + 1
settings._p_.setcache('u1', set(['inject']), ntime)
assert 'inject' in settings[od1.u1]
values._p_.setcache('u1', 100, ntime)
assert c.u1 == [100]
def test_get_cache_no_expire():
# force a value in cache, try if reget corrupted value
od1 = make_description()
c = Config(od1)
values = c.cfgimpl_get_values()
settings = c.cfgimpl_get_settings()
settings._p_.setcache('u1', set(['inject2']), None)
assert 'inject2' in settings[od1.u1]
values._p_.setcache('u1', 200, None)
assert c.u1 == [200]
def test_cache_reset():
od1 = make_description()
c = Config(od1)
values = c.cfgimpl_get_values()
settings = c.cfgimpl_get_settings()
#when change a value
c.u1
assert 'u1' in values._p_.get_cached(c)
assert 'u1' in settings._p_.get_cached(c)
c.u2 = 1
assert 'u1' not in values._p_.get_cached(c)
assert 'u1' not in settings._p_.get_cached(c)
#when remove a value
c.u1
assert 'u1' in values._p_.get_cached(c)
assert 'u1' in settings._p_.get_cached(c)
del(c.u2)
assert 'u1' not in values._p_.get_cached(c)
assert 'u1' not in settings._p_.get_cached(c)
#when add/del property
c.u1
assert 'u1' in values._p_.get_cached(c)
assert 'u1' in settings._p_.get_cached(c)
c.cfgimpl_get_settings()[od1.u2].append('test')
assert 'u1' not in values._p_.get_cached(c)
assert 'u1' not in settings._p_.get_cached(c)
c.u1
assert 'u1' in values._p_.get_cached(c)
assert 'u1' in settings._p_.get_cached(c)
c.cfgimpl_get_settings()[od1.u2].remove('test')
assert 'u1' not in values._p_.get_cached(c)
assert 'u1' not in settings._p_.get_cached(c)
#when enable/disabled property
c.u1
assert 'u1' in values._p_.get_cached(c)
assert 'u1' in settings._p_.get_cached(c)
c.cfgimpl_get_settings().append('test')
assert 'u1' not in values._p_.get_cached(c)
assert 'u1' not in settings._p_.get_cached(c)
c.u1
assert 'u1' in values._p_.get_cached(c)
assert 'u1' in settings._p_.get_cached(c)
c.cfgimpl_get_settings().remove('test')
assert 'u1' not in values._p_.get_cached(c)
assert 'u1' not in settings._p_.get_cached(c)
def test_cache_reset_multi():
od1 = make_description()
c = Config(od1)
values = c.cfgimpl_get_values()
settings = c.cfgimpl_get_settings()
#when change a value
c.u1
assert 'u1' in values._p_.get_cached(c)
assert 'u1' in settings._p_.get_cached(c)
c.u3 = [1]
assert 'u1' not in values._p_.get_cached(c)
assert 'u1' not in settings._p_.get_cached(c)
#when append value
c.u1
assert 'u1' in values._p_.get_cached(c)
assert 'u1' in settings._p_.get_cached(c)
c.u3.append(1)
assert 'u1' not in values._p_.get_cached(c)
assert 'u1' not in settings._p_.get_cached(c)
#when pop value
c.u1
assert 'u1' in values._p_.get_cached(c)
assert 'u1' in settings._p_.get_cached(c)
c.u3.pop(1)
assert 'u1' not in values._p_.get_cached(c)
assert 'u1' not in settings._p_.get_cached(c)
#when remove a value
c.u1
assert 'u1' in values._p_.get_cached(c)
assert 'u1' in settings._p_.get_cached(c)
del(c.u3)
assert 'u1' not in values._p_.get_cached(c)
assert 'u1' not in settings._p_.get_cached(c)
def test_reset_cache():
od1 = make_description()
c = Config(od1)
values = c.cfgimpl_get_values()
settings = c.cfgimpl_get_settings()
c.u1
assert 'u1' in values._p_.get_cached(c)
assert 'u1' in settings._p_.get_cached(c)
c.cfgimpl_reset_cache()
assert 'u1' not in values._p_.get_cached(c)
assert 'u1' not in settings._p_.get_cached(c)
c.u1
sleep(1)
c.u1
sleep(1)
c.u2
assert 'u1' in values._p_.get_cached(c)
assert 'u1' in settings._p_.get_cached(c)
assert 'u2' in values._p_.get_cached(c)
assert 'u2' in settings._p_.get_cached(c)
c.cfgimpl_reset_cache()
assert 'u1' not in values._p_.get_cached(c)
assert 'u1' not in settings._p_.get_cached(c)
assert 'u2' not in values._p_.get_cached(c)
assert 'u2' not in settings._p_.get_cached(c)
def test_reset_cache_subconfig():
od1 = make_description()
od2 = OptionDescription('od2', '', [od1])
c = Config(od2)
values = c.cfgimpl_get_values()
c.od1.u1
assert 'od1.u1' in values._p_.get_cached(c)
c.od1.cfgimpl_reset_cache()
assert 'od1.u1' not in values._p_.get_cached(c)
def test_reset_cache_only_expired():
od1 = make_description()
c = Config(od1)
values = c.cfgimpl_get_values()
settings = c.cfgimpl_get_settings()
c.u1
assert 'u1' in values._p_.get_cached(c)
assert 'u1' in settings._p_.get_cached(c)
c.cfgimpl_reset_cache(True)
assert 'u1' in values._p_.get_cached(c)
assert 'u1' in settings._p_.get_cached(c)
sleep(1)
c.u1
sleep(1)
c.u2
assert 'u1' in values._p_.get_cached(c)
assert 'u1' in settings._p_.get_cached(c)
assert 'u2' in values._p_.get_cached(c)
assert 'u2' in settings._p_.get_cached(c)
c.cfgimpl_reset_cache(True)
assert 'u1' not in values._p_.get_cached(c)
assert 'u1' not in settings._p_.get_cached(c)
assert 'u2' in values._p_.get_cached(c)
assert 'u2' in settings._p_.get_cached(c)
def test_cache_not_expire():
od1 = make_description()
c = Config(od1)
values = c.cfgimpl_get_values()
settings = c.cfgimpl_get_settings()
settings.remove('expire')
c.u1
assert 'u1' in values._p_.get_cached(c)
assert 'u1' in settings._p_.get_cached(c)
c.cfgimpl_reset_cache(True)
assert 'u1' in values._p_.get_cached(c)
assert 'u1' in settings._p_.get_cached(c)
sleep(1)
c.u2
assert 'u1' in values._p_.get_cached(c)
assert 'u1' in settings._p_.get_cached(c)
assert 'u2' in values._p_.get_cached(c)
assert 'u2' in settings._p_.get_cached(c)
c.cfgimpl_reset_cache(True)
assert 'u1' in values._p_.get_cached(c)
assert 'u1' in settings._p_.get_cached(c)
assert 'u2' in values._p_.get_cached(c)
assert 'u2' in settings._p_.get_cached(c)
def test_cache_not_cache():
od1 = make_description()
c = Config(od1)
values = c.cfgimpl_get_values()
settings = c.cfgimpl_get_settings()
settings.remove('cache')
c.u1
assert 'u1' not in values._p_.get_cached(c)
assert 'u1' not in settings._p_.get_cached(c)
def test_reset_cache_only():
od1 = make_description()
c = Config(od1)
values = c.cfgimpl_get_values()
settings = c.cfgimpl_get_settings()
c.u1
assert 'u1' in values._p_.get_cached(c)
assert 'u1' in settings._p_.get_cached(c)
c.cfgimpl_reset_cache(only=('values',))
assert 'u1' not in values._p_.get_cached(c)
assert 'u1' in settings._p_.get_cached(c)
c.u1
assert 'u1' in values._p_.get_cached(c)
assert 'u1' in settings._p_.get_cached(c)
c.cfgimpl_reset_cache(only=('settings',))
assert 'u1' in values._p_.get_cached(c)
assert 'u1' not in settings._p_.get_cached(c)
def test_force_cache():
u1 = IntOption('u1', '', multi=True)
u2 = IntOption('u2', '')
u3 = IntOption('u3', '', multi=True)
u4 = IntOption('u4', '', properties=('disabled',))
od = OptionDescription('od1', '', [u1, u2, u3, u4])
c = Config(od)
c.cfgimpl_get_settings().remove('expire')
c.cfgimpl_get_values().force_cache()
assert c.cfgimpl_get_values()._p_.get_cached(c) == {'u1': ([], None), 'u3': ([], None), 'u2': (None, None), 'u4': (None, None)}
assert c.cfgimpl_get_settings()._p_.get_cached(c) == {'u4': (set(['disabled']), None), 'u1': (set([]), None), 'u3': (set([]), None), 'u2': (set([]), None)}
c.read_only()
c.cfgimpl_get_values().force_cache()
assert c.cfgimpl_get_values()._p_.get_cached(c) == {'u1': ([], None), 'u3': ([], None), 'u2': (None, None)}
assert c.cfgimpl_get_settings()._p_.get_cached(c) == {'u4': (set(['disabled']), None), 'u1': (set([]), None), 'u3': (set([]), None), 'u2': (set([]), None)}
c.cfgimpl_get_settings().remove('cache')
raises(ConfigError, "c.cfgimpl_get_values().force_cache()")