tiramisu/tests/test_storage.py

334 lines
12 KiB
Python

# coding: utf-8
from .autopath import do_autopath
do_autopath()
from py.test import raises
import pytest
from tiramisu.error import ConfigError
from tiramisu import Config, BoolOption, OptionDescription, Leadership, \
list_sessions, delete_session, default_storage, MetaConfig
from tiramisu.setting import groups, owners
from .config import event_loop
@pytest.mark.asyncio
async def test_non_persistent():
b = BoolOption('b', '')
o = OptionDescription('od', '', [b])
async with await Config(o, session_id='test_non_persistent', delete_old_session=True) as cfg:
pass
assert not await list_sessions()
@pytest.mark.asyncio
async def test_list():
b = BoolOption('b', '')
o = OptionDescription('od', '', [b])
async with await Config(o, session_id='test_non_persistent') as cfg:
await cfg.option('b').value.set(True)
assert 'test_non_persistent' in await list_sessions()
assert 'test_non_persistent' not in await list_sessions()
assert not await list_sessions()
@pytest.mark.asyncio
async def test_create_persistent():
b = BoolOption('b', '')
o = OptionDescription('od', '', [b])
await Config(o, session_id='test_persistent')
await delete_session('test_persistent')
assert not await list_sessions()
@pytest.mark.asyncio
async def test_list_sessions_persistent():
b = BoolOption('b', '')
o = OptionDescription('od', '', [b])
cfg = await Config(o, session_id='test_persistent')
await cfg.option('b').value.set(True)
assert 'test_persistent' in await list_sessions()
await delete_session('test_persistent')
assert not await list_sessions()
@pytest.mark.asyncio
async def test_delete_session_persistent():
b = BoolOption('b', '')
o = OptionDescription('od', '', [b])
await Config(o, session_id='test_persistent')
assert 'test_persistent' in await list_sessions()
await delete_session('test_persistent')
assert 'test_persistent' not in await list_sessions()
assert not await list_sessions()
@pytest.mark.asyncio
async def test_create_persistent_retrieve():
b = BoolOption('b', '')
o = OptionDescription('od', '', [b])
cfg = await Config(o, session_id='test_persistent')
assert await cfg.option('b').value.get() is None
await cfg.option('b').value.set(True)
assert await cfg.option('b').value.get() is True
del cfg
cfg = await Config(o, session_id='test_persistent')
assert await cfg.option('b').value.get() is True
assert 'test_persistent' in await list_sessions()
await delete_session(await cfg.session.id())
del cfg
cfg = await Config(o, session_id='test_persistent')
assert await cfg.option('b').value.get() is None
await delete_session(await cfg.session.id())
del cfg
assert not await list_sessions()
@pytest.mark.asyncio
async def test_two_persistent():
b = BoolOption('b', '')
o = OptionDescription('od', '', [b])
cfg = await Config(o, session_id='test_persistent')
cfg2 = await Config(o, session_id='test_persistent')
await cfg2.property.pop('cache')
assert await cfg.option('b').value.get() is None
assert await cfg2.option('b').value.get() is None
#
await cfg.option('b').value.set(False)
assert await cfg.option('b').value.get() is False
assert await cfg2.option('b').value.get() is False
#
await cfg.option('b').value.set(True)
assert await cfg.option('b').value.get() is True
assert await cfg2.option('b').value.get() is True
await delete_session('test_persistent')
assert not await list_sessions()
@pytest.mark.asyncio
async def test_create_persistent_retrieve_owner():
b = BoolOption('b', '')
o = OptionDescription('od', '', [b])
cfg = await Config(o, session_id='test_persistent')
assert await cfg.option('b').owner.isdefault()
await cfg.option('b').value.set(True)
assert await cfg.option('b').value.get()
assert await cfg.option('b').owner.get() == 'user'
##owners.addowner('persistentowner')
await cfg.option('b').owner.set('persistentowner')
assert await cfg.option('b').owner.get() == 'persistentowner'
del cfg
#
cfg = await Config(o, session_id='test_persistent')
await cfg.option('b').owner.set('persistentowner')
await delete_session(await cfg.session.id())
del cfg
#
cfg = await Config(o, session_id='test_persistent')
assert await cfg.option('b').value.get() is None
assert await cfg.option('b').owner.isdefault()
await delete_session(await cfg.session.id())
del cfg
assert not await list_sessions()
@pytest.mark.asyncio
async def test_create_persistent_retrieve_owner_leadership():
a = BoolOption('a', '', multi=True)
b = BoolOption('b', '', multi=True)
o = Leadership('a', '', [a, b])
o1 = OptionDescription('a', '', [o])
cfg = await Config(o1, session_id='test_persistent')
assert await cfg.option('a.a').owner.isdefault()
await cfg.option('a.a').value.set([True, False])
await cfg.option('a.b', 1).value.set(True)
assert await cfg.option('a.a').owner.get() == 'user'
assert await cfg.option('a.b', 0).owner.isdefault()
assert await cfg.option('a.b', 1).owner.get() == 'user'
#owners.addowner('persistentowner2')
await cfg.option('a.b', 1).owner.set('persistentowner2')
await cfg.option('a.b', 0).value.set(True)
assert await cfg.option('a.b', 0).owner.get() == 'user'
assert await cfg.option('a.b', 1).owner.get() == 'persistentowner2'
assert await cfg.option('a.a').value.get() == [True, False]
del cfg
#
cfg = await Config(o1, session_id='test_persistent')
assert await cfg.option('a.a').value.get() == [True, False]
assert await cfg.option('a.b', 0).owner.get() == 'user'
assert await cfg.option('a.b', 1).owner.get() == 'persistentowner2'
await delete_session(await cfg.session.id())
del cfg
#
cfg = await Config(o1, session_id='test_persistent')
assert await cfg.option('a.a').value.get() == []
await delete_session(await cfg.session.id())
del cfg
assert not await list_sessions()
@pytest.mark.asyncio
async def test_two_persistent_owner():
b = BoolOption('b', '')
o = OptionDescription('od', '', [b])
cfg = await Config(o, session_id='test_persistent')
await cfg.property.pop('cache')
cfg2 = await Config(o, session_id='test_persistent')
await cfg2.property.pop('cache')
assert await cfg.option('b').owner.isdefault()
assert await cfg2.option('b').owner.isdefault()
await cfg.option('b').value.set(False)
assert await cfg.option('b').owner.get() == 'user'
assert await cfg2.option('b').owner.get() == 'user'
await cfg.option('b').owner.set('persistent')
assert await cfg.option('b').owner.get() == 'persistent'
assert await cfg2.option('b').owner.get() == 'persistent'
await delete_session('test_persistent')
assert not await list_sessions()
@pytest.mark.asyncio
async def test_create_persistent_retrieve_information():
b = BoolOption('b', '')
o = OptionDescription('od', '', [b])
cfg = await Config(o, session_id='test_persistent')
await cfg.information.set('info', 'string')
assert await cfg.information.get('info') == 'string'
del cfg
#
cfg = await Config(o, session_id='test_persistent')
assert await cfg.information.get('info') == 'string'
await delete_session(await cfg.session.id())
del cfg
#
cfg = await Config(o, session_id='test_persistent')
assert await cfg.information.get('info', None) is None
await delete_session(await cfg.session.id())
del cfg
assert not await list_sessions()
@pytest.mark.asyncio
async def test_two_persistent_information():
b = BoolOption('b', '')
o = OptionDescription('od', '', [b])
cfg = await Config(o, session_id='test_persistent')
await cfg.property.pop('cache')
await cfg.information.set('info', 'string')
assert await cfg.information.get('info') == 'string'
cfg2 = await Config(o, session_id='test_persistent')
await cfg2.property.pop('cache')
assert await cfg2.information.get('info') == 'string'
await delete_session('test_persistent')
assert not await list_sessions()
@pytest.mark.asyncio
async def test_two_different_persistents():
b = BoolOption('b', '')
o = OptionDescription('od', '', [b])
cfg = await Config(o, session_id='test_persistent')
await cfg.property.pop('cache')
cfg2 = await Config(o, session_id='test_persistent2')
await cfg2.property.pop('cache')
await cfg.option('b').property.add('test')
assert await cfg.option('b').property.get() == {'test'}
assert await cfg2.option('b').property.get() == set()
assert await cfg.option('b').value.get() is None
assert await cfg2.option('b').value.get() is None
await cfg.option('b').value.set(True)
assert await cfg.option('b').value.get() == True
assert await cfg2.option('b').value.get() is None
await delete_session('test_persistent')
await delete_session('test_persistent2')
assert not await list_sessions()
@pytest.mark.asyncio
async def test_two_different_information():
b = BoolOption('b', '')
o = OptionDescription('od', '', [b])
cfg = await Config(o, session_id='test_persistent')
await cfg.information.set('a', 'a')
cfg2 = await Config(o, session_id='test_persistent2')
await cfg2.information.set('a', 'b')
assert await cfg.information.get('a') == 'a'
assert await cfg2.information.get('a') == 'b'
await delete_session('test_persistent')
await delete_session('test_persistent2')
assert not await list_sessions()
@pytest.mark.asyncio
async def test_exportation_importation():
b = BoolOption('b', '')
o = OptionDescription('od', '', [b])
cfg = await Config(o, session_id='test_persistent')
cfg2 = await Config(o, session_id='test_persistent2')
cfg3 = await Config(o, session_id='test_persistent3')
await cfg.owner.set('export')
assert await cfg.option('b').value.get() is None
await cfg.option('b').value.set(True)
assert await cfg.option('b').value.get() is True
assert await cfg.owner.get() == 'export'
del cfg
#
cfg = await Config(o, session_id='test_persistent')
assert await cfg.owner.get() == 'export'
assert await cfg.value.exportation() == [['b'], [None], [True], ['export']]
await cfg2.value.importation(await cfg.value.exportation())
assert await cfg.value.exportation() == [['b'], [None], [True], ['export']]
assert await cfg.owner.get() == 'export'
assert await cfg2.value.exportation() == [['b'], [None], [True], ['export']]
assert await cfg2.owner.get() == 'user'
del cfg2
#
cfg2 = await Config(o, session_id='test_persistent2')
assert await cfg2.value.exportation() == [['b'], [None], [True], ['export']]
assert await cfg2.owner.get() == 'user'
#
await cfg3.value.importation(await cfg.value.exportation(with_default_owner=True))
assert await cfg3.value.exportation() == [['b'], [None], [True], ['export']]
assert await cfg3.owner.get() == 'export'
del cfg3
#
cfg3 = await Config(o, session_id='test_persistent3')
assert await cfg3.value.exportation() == [['b'], [None], [True], ['export']]
assert await cfg3.owner.get() == 'export'
#
await delete_session('test_persistent')
await delete_session('test_persistent2')
await delete_session('test_persistent3')
assert not await list_sessions()
@pytest.mark.asyncio
async def test_create_persistent_context_property():
b = BoolOption('b', '')
o = OptionDescription('od', '', [b])
cfg = await Config(o, session_id='test_persistent')
await cfg.property.add('persistent')
del cfg
#
cfg = await Config(o, session_id='test_persistent')
assert 'persistent' in await cfg.property.get()
del cfg
await delete_session('test_persistent')
assert not await list_sessions()
@pytest.mark.asyncio
async def test_create_persistent_property():
b = BoolOption('b', '')
o = OptionDescription('od', '', [b])
cfg = await Config(o, session_id='test_persistent')
await cfg.option('b').property.add('persistent')
del cfg
#
cfg = await Config(o, session_id='test_persistent')
assert 'persistent' in await cfg.option('b').property.get()
del cfg
await delete_session('test_persistent')