from .autopath import do_autopath do_autopath() import pytest from tiramisu.setting import groups, owners from tiramisu import IntOption, StrOption, NetworkOption, NetmaskOption, \ OptionDescription, Leadership, Config, GroupConfig, MixConfig, \ MetaConfig, Params, ParamOption, ParamValue, ParamSelfOption, Calculation, \ valid_network_netmask, delete_session from tiramisu.error import ConfigError, ConflictError, PropertiesOptionError, LeadershipError, APIError from tiramisu.storage import list_sessions from .config import delete_sessions, event_loop owners.addowner('mix1') owners.addowner('mix2') def return_value(value=None): return value def raise_exception(): raise Exception('test') def make_description(): i1 = IntOption('i1', '') i2 = IntOption('i2', '', default=1) i3 = IntOption('i3', '') i4 = IntOption('i4', '', default=2) i5 = IntOption('i5', '', default=[2], multi=True) i6 = IntOption('i6', '', properties=('disabled',)) od1 = OptionDescription('od1', '', [i1, i2, i3, i4, i5, i6]) od2 = OptionDescription('od2', '', [od1]) return od2 def make_description1(): i1 = IntOption('i1', '') i2 = IntOption('i2', '', default=1) i3 = IntOption('i3', '') i4 = IntOption('i4', '', default=2) i5 = IntOption('i5', '', default=[2], multi=True) i6 = IntOption('i6', '', properties=('disabled',)) od1 = OptionDescription('od1', '', [i1, i2, i3, i4, i5, i6]) od2 = OptionDescription('od2', '', [od1]) return od2 def make_description2(): i1 = IntOption('i1', '') i2 = IntOption('i2', '', default=1) i3 = IntOption('i3', '') i4 = IntOption('i4', '', default=2) i5 = IntOption('i5', '', default=[2], multi=True) i6 = IntOption('i6', '', properties=('disabled',)) od1 = OptionDescription('od1', '', [i1, i2, i3, i4, i5, i6]) od2 = OptionDescription('od2', '', [od1]) return od2 def make_description3(): i1 = IntOption('i1', '') i2 = IntOption('i2', '', default=1) i3 = IntOption('i3', '') i4 = IntOption('i4', '', default=2) i5 = IntOption('i5', '', default=[2], multi=True) i6 = IntOption('i6', '', properties=('disabled',)) od1 = OptionDescription('od1', '', [i1, i2, i3, i4, i5, i6]) od2 = OptionDescription('od2', '', [od1]) return od2 async def make_mixconfig(double=False): od1 = make_description() od2 = make_description1() od3 = make_description2() conf1 = await Config(od1, session_id='conf1', delete_old_session=True) await conf1.property.read_write() conf2 = await Config(od2, session_id='conf2', delete_old_session=True) await conf2.property.read_write() mix = await MixConfig(od3, [conf1, conf2], session_id='mix', delete_old_session=True) if double: od4 = make_description3() await mix.owner.set(owners.mix2) mix = await MixConfig(od4, [mix], session_id='doublemix') await mix.property.read_write() await mix.owner.set(owners.mix1) return mix @pytest.mark.asyncio async def test_mix_name(): mix = await make_mixconfig(True) assert await mix.config.path() == 'doublemix' ret = await mix.config('mix') assert await ret.config.path() == 'doublemix.mix' ret = await mix.config('mix.conf1') assert await ret.config.path() == 'doublemix.mix.conf1' ret = await mix.config('mix.conf2') assert await ret.config.path() == 'doublemix.mix.conf2' await delete_sessions(mix) @pytest.mark.asyncio async def test_mix_not_group(): i1 = IntOption('i1', '') od1 = OptionDescription('od1', '', [i1]) od2 = OptionDescription('od2', '', [od1]) async with await Config(od2, session_id='conf1') as conf1: grp = await GroupConfig([conf1]) with pytest.raises(TypeError): await MixConfig(od2, [grp], session_id='error') await delete_session('error') assert not await list_sessions() @pytest.mark.asyncio async def test_unknown_config(): mix = await make_mixconfig() with pytest.raises(ConfigError): await mix.config('unknown') await delete_sessions(mix) @pytest.mark.asyncio async def test_none(): mix = await make_mixconfig() newconf1 = await mix.config('conf1') newconf2 = await mix.config('conf2') assert await mix.option('od1.i3').value.get() is await newconf1.option('od1.i3').value.get() is await newconf2.option('od1.i3').value.get() is None assert await mix.option('od1.i3').owner.get() is await newconf1.option('od1.i3').owner.get() is await newconf2.option('od1.i3').owner.get() is owners.default # await mix.option('od1.i3').value.set(3) assert await mix.option('od1.i3').value.get() == await newconf1.option('od1.i3').value.get() == await newconf2.option('od1.i3').value.get() == 3 assert await mix.option('od1.i3').owner.get() is await newconf1.option('od1.i3').owner.get() is await newconf2.option('od1.i3').owner.get() is owners.mix1 # await newconf1.option('od1.i3').value.set(2) assert await mix.option('od1.i3').value.get() == await newconf2.option('od1.i3').value.get() == 3 assert await newconf1.option('od1.i3').value.get() == 2 assert await mix.option('od1.i3').owner.get() is await newconf2.option('od1.i3').owner.get() is owners.mix1 assert await newconf1.option('od1.i3').owner.get() is owners.user # await mix.option('od1.i3').value.set(4) assert await mix.option('od1.i3').value.get() == await newconf2.option('od1.i3').value.get() == 4 assert await newconf1.option('od1.i3').value.get() == 2 assert await mix.option('od1.i3').owner.get() is await newconf2.option('od1.i3').owner.get() is owners.mix1 assert await newconf1.option('od1.i3').owner.get() is owners.user # await mix.option('od1.i3').value.reset() assert await mix.option('od1.i3').value.get() is await newconf2.option('od1.i3').value.get() is None assert await newconf1.option('od1.i3').value.get() == 2 assert await mix.option('od1.i3').owner.get() is await newconf2.option('od1.i3').owner.get() is owners.default assert await newconf1.option('od1.i3').owner.get() is owners.user # await newconf1.option('od1.i3').value.reset() assert await mix.option('od1.i3').value.get() is await newconf1.option('od1.i3').value.get() is await newconf2.option('od1.i3').value.get() is None assert await mix.option('od1.i3').owner.get() is await newconf1.option('od1.i3').owner.get() is await newconf2.option('od1.i3').owner.get() is owners.default # assert await mix.session.id() == await mix.session.id() await delete_sessions(mix) @pytest.mark.asyncio async def test_reset(): mix = await make_mixconfig() assert await mix.option('od1.i2').value.get() == 1 await mix.option('od1.i2').value.set(2) newconf1 = await mix.config('conf1') newconf2 = await mix.config('conf2') await newconf1.option('od1.i2').value.set(3) assert await mix.option('od1.i2').value.get() == 2 assert await newconf1.option('od1.i2').value.get() == 3 assert await newconf2.option('od1.i2').value.get() == 2 await mix.config.reset() assert await mix.option('od1.i2').value.get() == 1 assert await newconf1.option('od1.i2').value.get() == 3 assert await newconf2.option('od1.i2').value.get() == 1 await delete_sessions(mix) @pytest.mark.asyncio async def test_default(): mix = await make_mixconfig() newconf1 = await mix.config('conf1') newconf2 = await mix.config('conf2') assert await mix.option('od1.i2').value.get() == await newconf1.option('od1.i2').value.get() == await newconf2.option('od1.i2').value.get() == 1 assert await mix.option('od1.i2').owner.get() is await newconf1.option('od1.i2').owner.get() is await newconf2.option('od1.i2').owner.get() is owners.default # await mix.option('od1.i2').value.set(3) assert await mix.option('od1.i2').value.get() == await newconf1.option('od1.i2').value.get() == await newconf2.option('od1.i2').value.get() == 3 assert await mix.option('od1.i2').owner.get() is await newconf1.option('od1.i2').owner.get() is await newconf2.option('od1.i2').owner.get() is owners.mix1 # await newconf1.option('od1.i2').value.set(2) assert await mix.option('od1.i2').value.get() == await newconf2.option('od1.i2').value.get() == 3 assert await newconf1.option('od1.i2').value.get() == 2 assert await mix.option('od1.i2').owner.get() is await newconf2.option('od1.i2').owner.get() is owners.mix1 assert await newconf1.option('od1.i2').owner.get() is owners.user # await mix.option('od1.i2').value.set(4) assert await mix.option('od1.i2').value.get() == await newconf2.option('od1.i2').value.get() == 4 assert await newconf1.option('od1.i2').value.get() == 2 assert await mix.option('od1.i2').owner.get() is await newconf2.option('od1.i2').owner.get() is owners.mix1 assert await newconf1.option('od1.i2').owner.get() is owners.user # await mix.option('od1.i2').value.reset() assert await mix.option('od1.i2').value.get() == await newconf2.option('od1.i2').value.get() == 1 assert await newconf1.option('od1.i2').value.get() == 2 assert await mix.option('od1.i2').owner.get() is await newconf2.option('od1.i2').owner.get() is owners.default assert await newconf1.option('od1.i2').owner.get() is owners.user # await newconf1.option('od1.i2').value.reset() assert await mix.option('od1.i2').value.get() == await newconf1.option('od1.i2').value.get() == await newconf2.option('od1.i2').value.get() == 1 assert await mix.option('od1.i2').owner.get() is await newconf1.option('od1.i2').owner.get() is await newconf2.option('od1.i2').owner.get() is owners.default await delete_sessions(mix) @pytest.mark.asyncio async def test_contexts(): mix = await make_mixconfig() errors = await mix.value.set('od1.i2', 6, only_config=True) newconf1 = await mix.config('conf1') assert await mix.option('od1.i2').value.get() == 1 assert await mix.option('od1.i2').owner.get() == owners.default assert await newconf1.option('od1.i2').value.get() == await newconf1.option('od1.i2').value.get() == 6 assert await newconf1.option('od1.i2').owner.get() == await newconf1.option('od1.i2').owner.get() is owners.user assert len(errors) == 0 await delete_sessions(mix) @pytest.mark.asyncio async def test_find(): mix = await make_mixconfig() ret = list(await mix.option.find('i2')) assert len(ret) == 1 assert 1 == await ret[0].value.get() ret = await mix.option.find('i2', first=True) assert 1 == await ret.value.get() assert await mix.value.dict() == {'od1.i4': 2, 'od1.i1': None, 'od1.i3': None, 'od1.i2': 1, 'od1.i5': [2]} await delete_sessions(mix) @pytest.mark.asyncio async def test_mix_mix(): mix = await make_mixconfig(double=True) newmix = await mix.config('mix') newconf1 = await mix.config('mix.conf1') newconf2 = await mix.config('mix.conf2') assert await mix.option('od1.i2').value.get() == await newmix.option('od1.i2').value.get() == await newconf1.option('od1.i2').value.get() == await newconf2.option('od1.i2').value.get() == 1 assert await mix.option('od1.i2').owner.get() is await newmix.option('od1.i2').owner.get() is await newconf1.option('od1.i2').owner.get() is await newconf2.option('od1.i2').owner.get() is owners.default # await mix.option('od1.i2').value.set(3) assert await mix.option('od1.i2').value.get() == await newmix.option('od1.i2').value.get() == await newconf1.option('od1.i2').value.get() == await newconf2.option('od1.i2').value.get() == 3 assert await mix.option('od1.i2').owner.get() is await newmix.option('od1.i2').owner.get() is await newconf1.option('od1.i2').owner.get() is await newconf2.option('od1.i2').owner.get() is owners.mix1 # await newconf1.option('od1.i2').value.set(2) assert await mix.option('od1.i2').value.get() == await newmix.option('od1.i2').value.get() == await newconf2.option('od1.i2').value.get() == 3 assert await newconf1.option('od1.i2').value.get() == 2 assert await mix.option('od1.i2').owner.get() is await newmix.option('od1.i2').owner.get() is await newconf2.option('od1.i2').owner.get() is owners.mix1 assert await newconf1.option('od1.i2').owner.get() is owners.user # await newmix.option('od1.i2').value.set(4) assert await mix.option('od1.i2').value.get() == 3 assert await newmix.option('od1.i2').value.get() == await newconf2.option('od1.i2').value.get() == 4 assert await newconf1.option('od1.i2').value.get() == 2 assert await mix.option('od1.i2').owner.get() is owners.mix1 assert await newmix.option('od1.i2').owner.get() is await newconf2.option('od1.i2').owner.get() is owners.mix2 assert await newconf1.option('od1.i2').owner.get() is owners.user # await newmix.option('od1.i2').value.reset() assert await mix.option('od1.i2').value.get() == await newmix.option('od1.i2').value.get() == await newconf2.option('od1.i2').value.get() == 3 assert await newconf1.option('od1.i2').value.get() == 2 assert await mix.option('od1.i2').owner.get() is await newmix.option('od1.i2').owner.get() is await newconf2.option('od1.i2').owner.get() is owners.mix1 assert await newconf1.option('od1.i2').owner.get() is owners.user # await mix.option('od1.i2').value.reset() assert await mix.option('od1.i2').value.get() == await newmix.option('od1.i2').value.get() == await newconf2.option('od1.i2').value.get() == 1 assert await newconf1.option('od1.i2').value.get() == 2 assert await mix.option('od1.i2').owner.get() is await newmix.option('od1.i2').owner.get() is await newconf2.option('od1.i2').owner.get() is owners.default assert await newconf1.option('od1.i2').owner.get() is owners.user # await newconf1.option('od1.i2').value.reset() assert await mix.option('od1.i2').value.get() == await newmix.option('od1.i2').value.get() == await newconf1.option('od1.i2').value.get() == await newconf2.option('od1.i2').value.get() == 1 assert await mix.option('od1.i2').owner.get() is await newmix.option('od1.i2').owner.get() is await newconf1.option('od1.i2').owner.get() is await newconf2.option('od1.i2').owner.get() is owners.default await delete_sessions(mix) @pytest.mark.asyncio async def test_mix_mix_set(): mix = await make_mixconfig(double=True) errors1 = await mix.value.set('od1.i1', 7, only_config=True) errors2 = await mix.value.set('od1.i6', 7, only_config=True) assert len(errors1) == 0 assert len(errors2) == 2 ret = await mix.config('mix.conf1') conf1 = ret._config_bag.context ret = await mix.config('mix.conf2') conf2 = ret._config_bag.context newconf1 = await mix.config('mix.conf1') newconf2 = await mix.config('mix.conf2') assert await newconf1.option('od1.i1').value.get() == await newconf2.option('od1.i1').value.get() == 7 # dconfigs = [] ret = await mix.config.find('i1', value=7) for conf in await ret.config.list(): dconfigs.append(conf._config_bag.context) assert [conf1, conf2] == dconfigs await newconf1.option('od1.i1').value.set(8) # dconfigs = [] ret = await mix.config.find('i1') for conf in await ret.config.list(): dconfigs.append(conf._config_bag.context) assert [conf1, conf2] == dconfigs ret = await mix.config.find('i1', value=7) assert conf2 == list(await ret.config.list())[0]._config_bag.context ret = await mix.config.find('i1', value=8) assert conf1 == list(await ret.config.list())[0]._config_bag.context # dconfigs = [] ret = await mix.config.find('i5', value=2) for conf in await ret.config.list(): dconfigs.append(conf._config_bag.context) assert [conf1, conf2] == dconfigs # with pytest.raises(AttributeError): await mix.config.find('i1', value=10) with pytest.raises(AttributeError): await mix.config.find('not', value=10) with pytest.raises(AttributeError): await mix.config.find('i6') with pytest.raises(ValueError): await mix.value.set('od1.i6', 7, only_config=True, force_default=True) with pytest.raises(ValueError): await mix.value.set('od1.i6', 7, only_config=True, force_default_if_same=True) with pytest.raises(ValueError): await mix.value.set('od1.i6', 7, only_config=True, force_dont_change_value=True) await delete_sessions(mix) @pytest.mark.asyncio async def test_mix_unconsistent(): i1 = IntOption('i1', '') i2 = IntOption('i2', '', default=1) i3 = IntOption('i3', '') i4 = IntOption('i4', '', default=2) od1 = OptionDescription('od1', '', [i1, i2, i3, i4]) od2 = OptionDescription('od2', '', [od1]) od3 = OptionDescription('od3', '', [od1]) conf1 = await Config(od2, session_id='conf1') conf2 = await Config(od2, session_id='conf2') conf3 = await Config(od2, session_id='conf3') i5 = IntOption('i5', '') od4 = OptionDescription('od4', '', [i5]) conf4 = await Config(od4, session_id='conf4') mix = await MixConfig(od2, [conf1, conf2]) await mix.owner.set(owners.mix1) with pytest.raises(TypeError): await MixConfig(od2, "string", session_id='error') await delete_session('error') # same descr but conf1 already in mix assert len(list(await conf1.config.parents())) == 1 assert len(list(await conf3.config.parents())) == 0 new_mix = await MixConfig(od2, [conf1, conf3]) assert len(list(await conf1.config.parents())) == 2 assert len(list(await conf3.config.parents())) == 1 # not same descr tmix = await MixConfig(od2, [conf3, conf4]) await delete_sessions([mix, conf3, conf4, tmix, new_mix]) @pytest.mark.asyncio async def test_mix_leadership(): ip_admin_eth0 = StrOption('ip_admin_eth0', "ip", multi=True) netmask_admin_eth0 = StrOption('netmask_admin_eth0', "mask", multi=True, properties=('hidden',)) interface1 = Leadership('ip_admin_eth0', '', [ip_admin_eth0, netmask_admin_eth0]) od = OptionDescription('root', '', [interface1]) conf1 = await Config(od, session_id='conf1') conf2 = await Config(od, session_id='conf2') mix = await MixConfig(od, [conf1, conf2]) await mix.property.read_only() ret = await mix.config.find('ip_admin_eth0') configs = await ret.config.list() assert len(configs) == 2 assert conf1._config_bag.context == configs[0]._config_bag.context assert conf2._config_bag.context == configs[1]._config_bag.context ret = await mix.config.find('netmask_admin_eth0') configs = await ret.config.list() assert len(configs) == 2 assert conf1._config_bag.context == configs[0]._config_bag.context assert conf2._config_bag.context == configs[1]._config_bag.context await mix.property.read_write() with pytest.raises(AttributeError): await mix.config.find('netmask_admin_eth0') ret = await mix.unrestraint.config.find('netmask_admin_eth0') configs = await ret.config.list() assert len(configs) == 2 assert conf1._config_bag.context == configs[0]._config_bag.context assert conf2._config_bag.context == configs[1]._config_bag.context await mix.property.read_only() ret = await mix.config.find('netmask_admin_eth0') configs = await ret.config.list() assert len(configs) == 2 assert conf1._config_bag.context == configs[0]._config_bag.context assert conf2._config_bag.context == configs[1]._config_bag.context await delete_sessions(mix) @pytest.mark.asyncio async def test_mix_leadership_value2(): ip_admin_eth0 = StrOption('ip_admin_eth0', "ip", multi=True) netmask_admin_eth0 = StrOption('netmask_admin_eth0', "mask", multi=True, properties=('hidden',)) interface1 = Leadership('ip_admin_eth0', '', [ip_admin_eth0, netmask_admin_eth0]) od = OptionDescription('root', '', [interface1]) conf1 = await Config(od, session_id='conf1') conf2 = await Config(od, session_id='conf2') mix = await MixConfig(od, [conf1, conf2], session_id="mix") newconf1 = await mix.config('conf1') await newconf1.option('ip_admin_eth0.ip_admin_eth0').value.set(['192.168.1.8']) assert await newconf1.option('ip_admin_eth0.netmask_admin_eth0', 0).value.get() == None #FIXME devrait raise ! assert await newconf1.option('ip_admin_eth0.ip_admin_eth0', 0).value.get() == None # await newconf1.option('ip_admin_eth0.ip_admin_eth0').value.reset() # await mix.option('ip_admin_eth0.ip_admin_eth0').value.set(['192.168.1.1']) assert await newconf1.option('ip_admin_eth0.netmask_admin_eth0', 0).value.get() == None await mix.option('ip_admin_eth0.netmask_admin_eth0', 0).value.set('255.255.255.0') assert await newconf1.option('ip_admin_eth0.netmask_admin_eth0', 0).value.get() == '255.255.255.0' await mix.option('ip_admin_eth0.netmask_admin_eth0', 0).value.set('255.255.0.0') assert await newconf1.option('ip_admin_eth0.netmask_admin_eth0', 0).value.get() == '255.255.0.0' # await newconf1.option('ip_admin_eth0.ip_admin_eth0').value.set(['192.168.1.1']) assert await newconf1.option('ip_admin_eth0.netmask_admin_eth0', 0).value.get() == '255.255.0.0' await delete_sessions(mix) @pytest.mark.asyncio async def test_mix_leadership_value_default(): ip_admin_eth0 = StrOption('ip_admin_eth0', "ip", multi=True, default=['192.168.1.1']) netmask_admin_eth0 = StrOption('netmask_admin_eth0', "mask", multi=True) interface1 = Leadership('ip_admin_eth0', '', [ip_admin_eth0, netmask_admin_eth0]) od = OptionDescription('root', '', [interface1]) conf1 = await Config(od, session_id='conf1') conf2 = await Config(od, session_id='conf2') mix = await MixConfig(od, [conf1, conf2]) newconf1 = await mix.config('conf1') assert await newconf1.option('ip_admin_eth0.netmask_admin_eth0', 0).value.get() == None # await mix.option('ip_admin_eth0.ip_admin_eth0').value.set(['192.168.1.1']) assert await newconf1.option('ip_admin_eth0.netmask_admin_eth0', 0).value.get() == None # await mix.option('ip_admin_eth0.netmask_admin_eth0', 0).value.set('255.255.255.0') assert await newconf1.option('ip_admin_eth0.netmask_admin_eth0', 0).value.get() == '255.255.255.0' # await mix.option('ip_admin_eth0.netmask_admin_eth0', 0).value.set('255.255.0.0') assert await newconf1.option('ip_admin_eth0.netmask_admin_eth0', 0).value.get() == '255.255.0.0' # await newconf1.option('ip_admin_eth0.ip_admin_eth0').value.set(['192.168.1.1']) assert await newconf1.option('ip_admin_eth0.netmask_admin_eth0', 0).value.get() == '255.255.0.0' await delete_sessions(mix) @pytest.mark.asyncio async def test_mix_leadership_owners(): ip_admin_eth0 = StrOption('ip_admin_eth0', "ip", multi=True) netmask_admin_eth0 = StrOption('netmask_admin_eth0', "mask", multi=True, properties=('hidden',)) interface1 = Leadership('ip_admin_eth0', '', [ip_admin_eth0, netmask_admin_eth0]) od = OptionDescription('root', '', [interface1]) conf1 = await Config(od, session_id='conf1') conf2 = await Config(od, session_id='conf2') mix = await MixConfig(od, [conf1, conf2]) await mix.owner.set(owners.mix1) newconf1 = await mix.config('conf1') assert await newconf1.option('ip_admin_eth0.ip_admin_eth0').owner.isdefault() with pytest.raises(LeadershipError): await newconf1.option('ip_admin_eth0.netmask_admin_eth0', 0).owner.isdefault() # await newconf1.option('ip_admin_eth0.ip_admin_eth0').value.set(['192.168.1.1']) assert await newconf1.option('ip_admin_eth0.ip_admin_eth0').owner.get() == owners.user assert await newconf1.option('ip_admin_eth0.netmask_admin_eth0', 0).owner.isdefault() # await newconf1.option('ip_admin_eth0.ip_admin_eth0').value.reset() assert await newconf1.option('ip_admin_eth0.ip_admin_eth0').owner.isdefault() # await mix.option('ip_admin_eth0.ip_admin_eth0').value.set(['192.168.1.1']) assert await newconf1.option('ip_admin_eth0.ip_admin_eth0').owner.get() == owners.mix1 assert await newconf1.option('ip_admin_eth0.netmask_admin_eth0', 0).owner.isdefault() # await mix.option('ip_admin_eth0.netmask_admin_eth0', 0).value.set('255.255.255.0') assert await newconf1.option('ip_admin_eth0.ip_admin_eth0').owner.get() == owners.mix1 assert await newconf1.option('ip_admin_eth0.netmask_admin_eth0', 0).owner.get() == owners.mix1 # await mix.option('ip_admin_eth0.netmask_admin_eth0', 0).value.set('255.255.0.0') assert await newconf1.option('ip_admin_eth0.ip_admin_eth0').owner.get() == owners.mix1 assert await newconf1.option('ip_admin_eth0.netmask_admin_eth0', 0).owner.get() == owners.mix1 # await newconf1.option('ip_admin_eth0.ip_admin_eth0').value.set(['192.168.1.1']) assert await newconf1.option('ip_admin_eth0.ip_admin_eth0').owner.get() == owners.user assert await newconf1.option('ip_admin_eth0.netmask_admin_eth0', 0).owner.get() == owners.mix1 await delete_sessions(mix) @pytest.mark.asyncio async def test_mix_force_default(): ip_admin_eth0 = StrOption('ip_admin_eth0', "ip", multi=True) netmask_admin_eth0 = StrOption('netmask_admin_eth0', "mask", multi=True, properties=('hidden',)) interface1 = Leadership('ip_admin_eth0', '', [ip_admin_eth0, netmask_admin_eth0]) od = OptionDescription('root', '', [interface1]) conf1 = await Config(od, session_id='conf1') conf2 = await Config(od, session_id='conf2') mix = await MixConfig(od, [conf1, conf2]) await mix.property.read_write() await mix.owner.set('mix1') assert await mix.option('ip_admin_eth0.ip_admin_eth0').value.get() == [] newconf1 = await mix.config('conf1') newconf2 = await mix.config('conf2') assert await newconf1.option('ip_admin_eth0.ip_admin_eth0').value.get() == [] assert await newconf2.option('ip_admin_eth0.ip_admin_eth0').value.get() == [] # errors = await mix.value.set('ip_admin_eth0.ip_admin_eth0', ['192.168.1.1']) assert len(errors) == 0 assert await mix.option('ip_admin_eth0.ip_admin_eth0').value.get() == ['192.168.1.1'] assert await newconf1.option('ip_admin_eth0.ip_admin_eth0').value.get() == ['192.168.1.1'] assert await newconf2.option('ip_admin_eth0.ip_admin_eth0').value.get() == ['192.168.1.1'] # await newconf1.option('ip_admin_eth0.ip_admin_eth0').value.set(['192.168.1.2']) assert await mix.option('ip_admin_eth0.ip_admin_eth0').value.get() == ['192.168.1.1'] assert await newconf1.option('ip_admin_eth0.ip_admin_eth0').value.get() == ['192.168.1.2'] assert await newconf2.option('ip_admin_eth0.ip_admin_eth0').value.get() == ['192.168.1.1'] # errors = await mix.value.set('ip_admin_eth0.ip_admin_eth0', ['192.168.1.3']) assert len(errors) == 0 assert await mix.option('ip_admin_eth0.ip_admin_eth0').value.get() == ['192.168.1.3'] assert await newconf1.option('ip_admin_eth0.ip_admin_eth0').value.get() == ['192.168.1.2'] assert await newconf2.option('ip_admin_eth0.ip_admin_eth0').value.get() == ['192.168.1.3'] # errors = await mix.value.set('ip_admin_eth0.ip_admin_eth0', ['192.168.1.4'], force_default=True) assert len(errors) == 0 assert await mix.option('ip_admin_eth0.ip_admin_eth0').value.get() == ['192.168.1.4'] assert await newconf1.option('ip_admin_eth0.ip_admin_eth0').value.get() == ['192.168.1.4'] assert await newconf2.option('ip_admin_eth0.ip_admin_eth0').value.get() == ['192.168.1.4'] await delete_sessions(mix) @pytest.mark.asyncio async def test_mix_force_dont_change_value(): ip_admin_eth0 = StrOption('ip_admin_eth0', "ip", multi=True) netmask_admin_eth0 = StrOption('netmask_admin_eth0', "mask", multi=True, properties=('hidden',)) interface1 = Leadership('ip_admin_eth0', '', [ip_admin_eth0, netmask_admin_eth0]) od = OptionDescription('root', '', [interface1]) conf1 = await Config(od, session_id='conf1') conf2 = await Config(od, session_id='conf2') mix = await MixConfig(od, [conf1, conf2]) await mix.property.read_write() await mix.owner.set('mix1') assert await mix.option('ip_admin_eth0.ip_admin_eth0').value.get() == [] newconf1 = await mix.config('conf1') newconf2 = await mix.config('conf2') assert await newconf1.option('ip_admin_eth0.ip_admin_eth0').value.get() == [] assert await newconf2.option('ip_admin_eth0.ip_admin_eth0').value.get() == [] await newconf1.option('ip_admin_eth0.ip_admin_eth0').value.set(['192.168.1.4']) assert await newconf1.option('ip_admin_eth0.ip_admin_eth0').value.get() == ['192.168.1.4'] assert await newconf2.option('ip_admin_eth0.ip_admin_eth0').value.get() == [] assert await newconf1.option('ip_admin_eth0.ip_admin_eth0').owner.get() is owners.user assert await newconf2.option('ip_admin_eth0.ip_admin_eth0').owner.isdefault() errors = await mix.value.set('ip_admin_eth0.ip_admin_eth0', ['192.168.1.4'], force_dont_change_value=True) assert len(errors) == 0 assert await mix.option('ip_admin_eth0.ip_admin_eth0').value.get() == ['192.168.1.4'] assert await newconf1.option('ip_admin_eth0.ip_admin_eth0').value.get() == ['192.168.1.4'] assert await newconf2.option('ip_admin_eth0.ip_admin_eth0').value.get() == [] assert await newconf1.option('ip_admin_eth0.ip_admin_eth0').owner.get() is owners.user assert await newconf2.option('ip_admin_eth0.ip_admin_eth0').owner.get() is owners.user await delete_sessions(mix) @pytest.mark.asyncio async def test_mix_force_default_if_same(): ip_admin_eth0 = StrOption('ip_admin_eth0', "ip", multi=True) netmask_admin_eth0 = StrOption('netmask_admin_eth0', "mask", multi=True, properties=('hidden',)) interface1 = Leadership('ip_admin_eth0', '', [ip_admin_eth0, netmask_admin_eth0]) od = OptionDescription('root', '', [interface1]) conf1 = await Config(od, session_id='conf1') conf2 = await Config(od, session_id='conf2') mix = await MixConfig(od, [conf1, conf2]) await mix.property.read_write() await mix.owner.set('mix1') # assert await mix.option('ip_admin_eth0.ip_admin_eth0').value.get() == [] newconf1 = await mix.config('conf1') newconf2 = await mix.config('conf2') assert await newconf1.option('ip_admin_eth0.ip_admin_eth0').value.get() == [] assert await newconf2.option('ip_admin_eth0.ip_admin_eth0').value.get() == [] # await newconf1.option('ip_admin_eth0.ip_admin_eth0').value.set(['192.168.1.4']) assert await newconf1.option('ip_admin_eth0.ip_admin_eth0').value.get() == ['192.168.1.4'] assert await newconf2.option('ip_admin_eth0.ip_admin_eth0').value.get() == [] assert await newconf1.option('ip_admin_eth0.ip_admin_eth0').owner.get() is owners.user assert await newconf2.option('ip_admin_eth0.ip_admin_eth0').owner.isdefault() errors = await mix.value.set('ip_admin_eth0.ip_admin_eth0', ['192.168.1.4'], force_default_if_same=True) assert len(errors) == 0 assert await mix.option('ip_admin_eth0.ip_admin_eth0').value.get() == ['192.168.1.4'] assert await newconf1.option('ip_admin_eth0.ip_admin_eth0').value.get() == ['192.168.1.4'] assert await newconf2.option('ip_admin_eth0.ip_admin_eth0').value.get() == ['192.168.1.4'] assert await newconf1.option('ip_admin_eth0.ip_admin_eth0').owner.get() is owners.mix1 assert await newconf2.option('ip_admin_eth0.ip_admin_eth0').owner.get() is owners.mix1 # await newconf1.option('ip_admin_eth0.ip_admin_eth0').value.set(['192.168.1.3']) assert await newconf1.option('ip_admin_eth0.ip_admin_eth0').value.get() == ['192.168.1.3'] assert await newconf2.option('ip_admin_eth0.ip_admin_eth0').value.get() == ['192.168.1.4'] assert await newconf1.option('ip_admin_eth0.ip_admin_eth0').owner.get() is owners.user assert await newconf2.option('ip_admin_eth0.ip_admin_eth0').owner.get() is owners.mix1 errors = await mix.value.set('ip_admin_eth0.ip_admin_eth0', ['192.168.1.5'], force_default_if_same=True) assert len(errors) == 0 assert await mix.option('ip_admin_eth0.ip_admin_eth0').value.get() == ['192.168.1.5'] assert await newconf1.option('ip_admin_eth0.ip_admin_eth0').value.get() == ['192.168.1.3'] assert await newconf2.option('ip_admin_eth0.ip_admin_eth0').value.get() == ['192.168.1.5'] assert await newconf1.option('ip_admin_eth0.ip_admin_eth0').owner.get() is owners.user assert await newconf2.option('ip_admin_eth0.ip_admin_eth0').owner.get() is owners.mix1 await delete_sessions(mix) @pytest.mark.asyncio async def test_mix_force_default_if_same_and_dont_change(): ip_admin_eth0 = StrOption('ip_admin_eth0', "ip", multi=True) netmask_admin_eth0 = StrOption('netmask_admin_eth0', "mask", multi=True, properties=('hidden',)) interface1 = Leadership('ip_admin_eth0', '', [ip_admin_eth0, netmask_admin_eth0]) od = OptionDescription('root', '', [interface1]) conf1 = await Config(od, session_id='conf1') conf2 = await Config(od, session_id='conf2') mix = await MixConfig(od, [conf1, conf2]) await mix.property.read_write() await mix.owner.set('mix1') # assert await mix.option('ip_admin_eth0.ip_admin_eth0').value.get() == [] newconf1 = await mix.config('conf1') newconf2 = await mix.config('conf2') assert await newconf1.option('ip_admin_eth0.ip_admin_eth0').value.get() == [] assert await newconf2.option('ip_admin_eth0.ip_admin_eth0').value.get() == [] # await newconf1.option('ip_admin_eth0.ip_admin_eth0').value.set(['192.168.1.4']) assert await newconf1.option('ip_admin_eth0.ip_admin_eth0').value.get() == ['192.168.1.4'] assert await newconf2.option('ip_admin_eth0.ip_admin_eth0').value.get() == [] assert await newconf1.option('ip_admin_eth0.ip_admin_eth0').owner.get() is owners.user assert await newconf2.option('ip_admin_eth0.ip_admin_eth0').owner.isdefault() errors = await mix.value.set('ip_admin_eth0.ip_admin_eth0', ['192.168.1.4'], force_default_if_same=True, force_dont_change_value=True) assert len(errors) == 0 assert await mix.option('ip_admin_eth0.ip_admin_eth0').value.get() == ['192.168.1.4'] assert await newconf1.option('ip_admin_eth0.ip_admin_eth0').value.get() == ['192.168.1.4'] assert await newconf2.option('ip_admin_eth0.ip_admin_eth0').value.get() == [] assert await newconf1.option('ip_admin_eth0.ip_admin_eth0').owner.get() is owners.mix1 assert await newconf2.option('ip_admin_eth0.ip_admin_eth0').owner.get() is owners.user # await newconf1.option('ip_admin_eth0.ip_admin_eth0').value.set(['192.168.1.3']) assert await newconf1.option('ip_admin_eth0.ip_admin_eth0').value.get() == ['192.168.1.3'] assert await newconf2.option('ip_admin_eth0.ip_admin_eth0').value.get() == [] assert await newconf1.option('ip_admin_eth0.ip_admin_eth0').owner.get() is owners.user assert await newconf2.option('ip_admin_eth0.ip_admin_eth0').owner.get() is owners.user errors = await mix.value.set('ip_admin_eth0.ip_admin_eth0', ['192.168.1.5'], force_default_if_same=True, force_dont_change_value=True) assert len(errors) == 0 assert await mix.option('ip_admin_eth0.ip_admin_eth0').value.get() == ['192.168.1.5'] assert await newconf1.option('ip_admin_eth0.ip_admin_eth0').value.get() == ['192.168.1.3'] assert await newconf2.option('ip_admin_eth0.ip_admin_eth0').value.get() == [] assert await newconf1.option('ip_admin_eth0.ip_admin_eth0').owner.get() is owners.user assert await newconf2.option('ip_admin_eth0.ip_admin_eth0').owner.get() is owners.user await delete_sessions(mix) @pytest.mark.asyncio async def test_mix_force_default_and_dont_change(): ip_admin_eth0 = StrOption('ip_admin_eth0', "ip", multi=True) netmask_admin_eth0 = StrOption('netmask_admin_eth0', "mask", multi=True, properties=('hidden',)) interface1 = Leadership('ip_admin_eth0', '', [ip_admin_eth0, netmask_admin_eth0]) od = OptionDescription('root', '', [interface1]) conf1 = await Config(od, session_id='rconf1') conf2 = await Config(od, session_id='rconf2') mix = await MixConfig(od, [conf1, conf2]) await mix.property.read_write() await mix.owner.set('mix1') with pytest.raises(ValueError): await mix.value.set('ip_admin_eth0.ip_admin_eth0', ['192.168.1.4'], force_default=True, force_dont_change_value=True) await delete_sessions(mix) @pytest.mark.asyncio async def test_mix_properties_mix(): ip_admin_eth0 = NetworkOption('ip_admin_eth0', "ip", multi=True, default=['192.168.1.1']) netmask_admin_eth0 = NetmaskOption('netmask_admin_eth0', "mask", multi=True, validators=[Calculation(valid_network_netmask, Params((ParamOption(ip_admin_eth0), ParamSelfOption())))]) interface1 = Leadership('ip_admin_eth0', '', [ip_admin_eth0, netmask_admin_eth0], properties=('disabled',)) od = OptionDescription('root', '', [interface1]) conf1 = await Config(od, session_id='conf1') conf2 = await Config(od, session_id='conf2') await conf1.property.read_write() await conf2.property.read_write() mix = await MixConfig(od, [conf1, conf2]) await mix.property.read_write() newconf1 = await mix.config('conf1') assert await newconf1.value.dict() == {} await delete_sessions(mix) @pytest.mark.asyncio async def test_mix_exception_mix(): ip_admin_eth0 = NetworkOption('ip_admin_eth0', "ip", multi=True, default=['192.168.1.1']) netmask_admin_eth0 = NetmaskOption('netmask_admin_eth0', "mask", Calculation(raise_exception), multi=True, validators=[Calculation(valid_network_netmask, Params((ParamOption(ip_admin_eth0), ParamSelfOption())))]) interface1 = Leadership('ip_admin_eth0', '', [ip_admin_eth0, netmask_admin_eth0]) od = OptionDescription('root', '', [interface1]) conf1 = await Config(od, session_id='conf1') conf2 = await Config(od, session_id='conf2') mix = await MixConfig(od, [conf1, conf2]) await mix.property.read_write() with pytest.raises(ConfigError): await conf1.value.dict() await delete_sessions(mix) @pytest.mark.asyncio async def test_mix_callback(): val1 = StrOption('val1', "", 'val') val2 = StrOption('val2', "", Calculation(return_value, Params(ParamOption(val1)))) val3 = StrOption('val3', "", Calculation(return_value, Params(ParamValue('yes')))) val4 = StrOption('val4', "", Calculation(return_value, Params(kwargs={'value': ParamOption(val1)}))) val5 = StrOption('val5', "", Calculation(return_value, Params(kwargs={'value': ParamValue('yes')}))) maconfig = OptionDescription('rootconfig', '', [val1, val2, val3, val4, val5]) cfg = await Config(maconfig, session_id='cfg') mix = await MixConfig(maconfig, [cfg]) await mix.property.read_write() newcfg = await mix.config('cfg') assert await newcfg.value.dict() == {'val3': 'yes', 'val2': 'val', 'val1': 'val', 'val5': 'yes', 'val4': 'val'} await newcfg.option('val1').value.set('new') assert await newcfg.value.dict() == {'val3': 'yes', 'val2': 'new', 'val1': 'new', 'val5': 'yes', 'val4': 'new'} await newcfg.option('val1').value.reset() await mix.option('val1').value.set('new') assert await newcfg.value.dict() == {'val3': 'yes', 'val2': 'new', 'val1': 'new', 'val5': 'yes', 'val4': 'new'} await newcfg.option('val4').value.set('new1') assert await newcfg.value.dict() == {'val3': 'yes', 'val2': 'new', 'val1': 'new', 'val5': 'yes', 'val4': 'new1'} await newcfg.option('val4').value.reset() await mix.option('val4').value.set('new1') assert await newcfg.value.dict() == {'val3': 'yes', 'val2': 'new', 'val1': 'new', 'val5': 'yes', 'val4': 'new1'} await mix.option('val4').value.reset() await delete_sessions(mix) @pytest.mark.asyncio async def test_mix_callback_follower(): val = StrOption('val', "", default='val') val1 = StrOption('val1', "", [Calculation(return_value, Params(ParamOption(val)))], multi=True) val3 = StrOption('val2', "", Calculation(return_value, Params(ParamOption(val1))), multi=True) val4 = StrOption('val3', "", Calculation(return_value, Params(ParamOption(val1))), multi=True) interface1 = Leadership('val1', '', [val1, val3, val4]) od = OptionDescription('root', '', [interface1]) maconfig = OptionDescription('rootconfig', '', [val, interface1]) cfg = await Config(maconfig, session_id='cfg1') mix = await MixConfig(maconfig, [cfg]) await mix.property.read_write() newcfg1 = await mix.config('cfg1') assert await newcfg1.value.dict() == {'val1.val2': ['val'], 'val1.val1': ['val'], 'val1.val3': ['val'], 'val': 'val'} # await newcfg1.option('val').value.set('val1') assert await newcfg1.value.dict() == {'val1.val2': ['val1'], 'val1.val1': ['val1'], 'val1.val3': ['val1'], 'val': 'val1'} # await newcfg1.option('val').value.reset() await mix.option('val').value.set('val1') assert await newcfg1.value.dict() == {'val1.val2': ['val1'], 'val1.val1': ['val1'], 'val1.val3': ['val1'], 'val': 'val1'} # await mix.option('val').value.reset() await newcfg1.option('val1.val2', 0).value.set('val2') assert await newcfg1.value.dict() == {'val1.val2': ['val2'], 'val1.val1': ['val'], 'val1.val3': ['val'], 'val': 'val'} # await newcfg1.option('val1.val2', 0).value.reset() assert await newcfg1.value.dict() == {'val1.val2': ['val'], 'val1.val1': ['val'], 'val1.val3': ['val'], 'val': 'val'} # await mix.option('val1.val2', 0).value.set('val2') assert await newcfg1.value.dict() == {'val1.val2': ['val2'], 'val1.val1': ['val'], 'val1.val3': ['val'], 'val': 'val'} # await mix.option('val1.val1').value.set(['val']) assert await newcfg1.value.dict() == {'val1.val2': ['val2'], 'val1.val1': ['val'], 'val1.val3': ['val'], 'val': 'val'} # await newcfg1.option('val1.val3', 0).value.set('val6') assert await newcfg1.value.dict() == {'val1.val2': ['val2'], 'val1.val1': ['val'], 'val1.val3': ['val6'], 'val': 'val'} # await mix.option('val1.val2', 0).value.reset() await newcfg1.option('val1.val3', 0).value.reset() await newcfg1.option('val1.val1').value.set(['val3']) assert await newcfg1.value.dict() == {'val1.val2': ['val3'], 'val1.val1': ['val3'], 'val1.val3': ['val3'], 'val': 'val'} # await newcfg1.option('val1.val1').value.reset() assert await newcfg1.value.dict() == {'val1.val2': ['val'], 'val1.val1': ['val'], 'val1.val3': ['val'], 'val': 'val'} # await mix.option('val1.val1').value.set(['val3']) assert await newcfg1.value.dict() == {'val1.val2': ['val3'], 'val1.val1': ['val3'], 'val1.val3': ['val3'], 'val': 'val'} # await newcfg1.option('val1.val2', 0).value.set('val2') assert await newcfg1.value.dict() == {'val1.val2': ['val2'], 'val1.val1': ['val3'], 'val1.val3': ['val3'], 'val': 'val'} # await mix.option('val1.val1').value.set(['val3', 'rah']) assert await newcfg1.value.dict() == {'val1.val2': ['val2', 'rah'], 'val1.val1': ['val3', 'rah'], 'val1.val3': ['val3', 'rah'], 'val': 'val'} # await mix.option('val1.val1').value.pop(1) await mix.option('val1.val1').value.set(['val4']) assert await newcfg1.value.dict() == {'val1.val2': ['val2'], 'val1.val1': ['val4'], 'val1.val3': ['val4'], 'val': 'val'} await delete_sessions(mix) @pytest.mark.asyncio async def test_meta_reset(): ip_admin_eth0 = StrOption('ip_admin_eth0', "ip", multi=True) netmask_admin_eth0 = StrOption('netmask_admin_eth0', "mask", multi=True, properties=('hidden',)) interface1 = Leadership('ip_admin_eth0', '', [ip_admin_eth0, netmask_admin_eth0]) od0 = OptionDescription('root', '', [interface1]) ip_admin_eth0 = StrOption('ip_admin_eth0', "ip", multi=True) netmask_admin_eth0 = StrOption('netmask_admin_eth0', "mask", multi=True, properties=('hidden',)) interface1 = Leadership('ip_admin_eth0', '', [ip_admin_eth0, netmask_admin_eth0]) od1 = OptionDescription('root', '', [interface1]) ip_admin_eth0 = StrOption('ip_admin_eth0', "ip", multi=True) netmask_admin_eth0 = StrOption('netmask_admin_eth0', "mask", multi=True, properties=('hidden',)) interface1 = Leadership('ip_admin_eth0', '', [ip_admin_eth0, netmask_admin_eth0]) od2 = OptionDescription('root', '', [interface1]) conf1 = await Config(od0, session_id='conf1') conf2 = await Config(od1, session_id='conf2') mix = await MixConfig(od2, [conf1, conf2]) await mix.property.read_write() await mix.owner.set('mix1') assert await mix.option('ip_admin_eth0.ip_admin_eth0').value.get() == [] newconf1 = await mix.config('conf1') newconf2 = await mix.config('conf2') assert await newconf1.option('ip_admin_eth0.ip_admin_eth0').value.get() == [] assert await newconf2.option('ip_admin_eth0.ip_admin_eth0').value.get() == [] errors = await mix.value.set('ip_admin_eth0.ip_admin_eth0', ['192.168.1.1']) assert len(errors) == 0 assert await mix.option('ip_admin_eth0.ip_admin_eth0').value.get() == ['192.168.1.1'] assert await newconf1.option('ip_admin_eth0.ip_admin_eth0').value.get() == ['192.168.1.1'] assert await newconf2.option('ip_admin_eth0.ip_admin_eth0').value.get() == ['192.168.1.1'] await newconf1.option('ip_admin_eth0.ip_admin_eth0').value.set(['192.168.1.2']) assert await mix.option('ip_admin_eth0.ip_admin_eth0').value.get() == ['192.168.1.1'] assert await newconf1.option('ip_admin_eth0.ip_admin_eth0').value.get() == ['192.168.1.2'] assert await newconf2.option('ip_admin_eth0.ip_admin_eth0').value.get() == ['192.168.1.1'] await mix.value.reset('ip_admin_eth0.ip_admin_eth0') assert await mix.option('ip_admin_eth0.ip_admin_eth0').value.get() == [] assert await newconf1.option('ip_admin_eth0.ip_admin_eth0').value.get() == [] assert await newconf2.option('ip_admin_eth0.ip_admin_eth0').value.get() == [] await delete_sessions(mix) @pytest.mark.asyncio async def test_mix_properties_mix_copy(): ip_admin_eth0 = NetworkOption('ip_admin_eth0', "ip", multi=True, default=['192.168.1.1']) netmask_admin_eth0 = NetmaskOption('netmask_admin_eth0', "mask", multi=True, properties=('disabled',)) interface0 = OptionDescription('ip_admin_eth0', '', [ip_admin_eth0, netmask_admin_eth0]) ip_admin_eth0 = NetworkOption('ip_admin_eth0', "ip", multi=True, default=['192.168.1.1']) netmask_admin_eth0 = NetmaskOption('netmask_admin_eth0', "mask", multi=True, properties=('disabled',)) interface1 = OptionDescription('ip_admin_eth0', '', [ip_admin_eth0, netmask_admin_eth0]) ip_admin_eth0 = NetworkOption('ip_admin_eth0', "ip", multi=True, default=['192.168.1.1']) netmask_admin_eth0 = NetmaskOption('netmask_admin_eth0', "mask", multi=True, properties=('disabled',)) interface2 = OptionDescription('ip_admin_eth0', '', [ip_admin_eth0, netmask_admin_eth0]) conf1 = await Config(interface0, session_id='conf1') conf2 = await Config(interface1, session_id='conf2') await conf1.property.read_write() await conf2.property.read_write() mix = await MixConfig(interface2, [conf1, conf2], session_id='mix1') await mix.property.read_write() newconf1 = await mix.config('conf1') conf3 = await newconf1.config.copy(session_id='conf3') newconf3 = await mix.config('conf3') # old fashion mix2 = await conf3.config.metaconfig() assert await mix.session.id() == await mix2.session.id() # new method mix2 = list(await conf3.config.parents()) assert len(mix2) == 1 assert await mix.session.id() == await mix2[0].session.id() newconf2 = await mix.config('conf2') assert await newconf1.value.dict() == {'ip_admin_eth0': ['192.168.1.1']} assert await conf2.value.dict() == {'ip_admin_eth0': ['192.168.1.1']} assert await newconf3.value.dict() == {'ip_admin_eth0': ['192.168.1.1']} await mix.option('ip_admin_eth0').value.set(['192.168.1.2']) assert await newconf1.value.dict() == {'ip_admin_eth0': ['192.168.1.2']} assert await conf2.value.dict() == {'ip_admin_eth0': ['192.168.1.2']} assert await newconf3.value.dict() == {'ip_admin_eth0': ['192.168.1.2']} ret = await mix.value.set('ip_admin_eth0', ['192.168.1.3'], force_default_if_same=True) assert await newconf1.value.dict() == {'ip_admin_eth0': ['192.168.1.3']} assert await conf2.value.dict() == {'ip_admin_eth0': ['192.168.1.3']} assert await newconf3.value.dict() == {'ip_admin_eth0': ['192.168.1.3']} await delete_sessions(mix) @pytest.mark.asyncio async def test_mix_properties_mix_deepcopy(): ip_admin_eth0 = NetworkOption('ip_admin_eth0', "ip", multi=True, default=['192.168.1.1']) netmask_admin_eth0 = NetmaskOption('netmask_admin_eth0', "mask", multi=True, properties=('disabled',)) interface0 = OptionDescription('ip_admin_eth0', '', [ip_admin_eth0, netmask_admin_eth0]) ip_admin_eth0 = NetworkOption('ip_admin_eth0', "ip", multi=True, default=['192.168.1.1']) netmask_admin_eth0 = NetmaskOption('netmask_admin_eth0', "mask", multi=True, properties=('disabled',)) interface1 = OptionDescription('ip_admin_eth0', '', [ip_admin_eth0, netmask_admin_eth0]) ip_admin_eth0 = NetworkOption('ip_admin_eth0', "ip", multi=True, default=['192.168.1.1']) netmask_admin_eth0 = NetmaskOption('netmask_admin_eth0', "mask", multi=True, properties=('disabled',)) interface2 = OptionDescription('ip_admin_eth0', '', [ip_admin_eth0, netmask_admin_eth0]) conf1 = await Config(interface0, session_id='conf1') conf2 = await Config(interface1, session_id='conf2') await conf1.property.read_write() await conf2.property.read_write() mix = await MixConfig(interface2, [conf1, conf2]) await mix.permissive.add('hidden') await mix.property.read_write() newconf1 = await mix.config('conf1') newconf2 = await mix.config('conf2') mix2 = await newconf1.config.deepcopy(session_id='conf3') newconf3 = await mix2.config('conf3') assert mix != mix2 assert await mix.permissive.get() == await mix2.permissive.get() assert await newconf1.value.dict() == {'ip_admin_eth0': ['192.168.1.1']} assert await newconf2.value.dict() == {'ip_admin_eth0': ['192.168.1.1']} assert await newconf3.value.dict() == {'ip_admin_eth0': ['192.168.1.1']} await mix.option('ip_admin_eth0').value.set(['192.168.1.2']) assert await newconf1.value.dict() == {'ip_admin_eth0': ['192.168.1.2']} assert await newconf2.value.dict() == {'ip_admin_eth0': ['192.168.1.2']} assert await newconf3.value.dict() == {'ip_admin_eth0': ['192.168.1.1']} await mix.value.set('ip_admin_eth0', ['192.168.1.3'], force_default_if_same=True) assert await newconf1.value.dict() == {'ip_admin_eth0': ['192.168.1.3']} assert await newconf2.value.dict() == {'ip_admin_eth0': ['192.168.1.3']} assert await newconf3.value.dict() == {'ip_admin_eth0': ['192.168.1.1']} await delete_sessions([mix, mix2]) @pytest.mark.asyncio async def test_mix_properties_submix_deepcopy(): ip_admin_eth0 = NetworkOption('ip_admin_eth0', "ip", multi=True, default=['192.168.1.1']) netmask_admin_eth0 = NetmaskOption('netmask_admin_eth0', "mask", multi=True, properties=('disabled',)) interface0 = OptionDescription('ip_admin_eth0', '', [ip_admin_eth0, netmask_admin_eth0]) ip_admin_eth0 = NetworkOption('ip_admin_eth0', "ip", multi=True, default=['192.168.1.1']) netmask_admin_eth0 = NetmaskOption('netmask_admin_eth0', "mask", multi=True, properties=('disabled',)) interface1 = OptionDescription('ip_admin_eth0', '', [ip_admin_eth0, netmask_admin_eth0]) ip_admin_eth0 = NetworkOption('ip_admin_eth0', "ip", multi=True, default=['192.168.1.1']) netmask_admin_eth0 = NetmaskOption('netmask_admin_eth0', "mask", multi=True, properties=('disabled',)) interface2 = OptionDescription('ip_admin_eth0', '', [ip_admin_eth0, netmask_admin_eth0]) conf1 = await Config(interface0, session_id='conf1') await conf1.property.read_write() mix1 = await MixConfig(interface1, [conf1], session_id='mix1') mix2 = await MixConfig(interface2, [mix1], session_id='mix2') mix_copy = await conf1.config.deepcopy(session_id='conf2', metaconfig_prefix='copy_') assert await mix_copy.session.id() == 'copy_mix2' ret1 = await mix_copy.config('copy_mix1') assert await ret1.session.id() == 'copy_mix1' ret2 = await mix_copy.config('copy_mix1.conf2') assert await ret2.session.id() == 'conf2' await delete_sessions([mix1, mix2, mix_copy, ret1, ret2]) @pytest.mark.asyncio async def test_mix_properties_submix_deepcopy_owner(): ip_admin_eth0 = NetworkOption('ip_admin_eth0', "ip") netmask_admin_eth0 = NetmaskOption('netmask_admin_eth1', "mask") interface0 = OptionDescription('ip_admin_eth0', '', [ip_admin_eth0, netmask_admin_eth0]) ip_admin_eth0 = NetworkOption('ip_admin_eth1', "ip") netmask_admin_eth0 = NetmaskOption('netmask_admin_eth0', "mask") interface1 = OptionDescription('ip_admin_eth0', '', [ip_admin_eth0, netmask_admin_eth0]) ip_admin_eth0 = NetworkOption('ip_admin_eth0', "ip") netmask_admin_eth0 = NetmaskOption('netmask_admin_eth0', "mask") interface2 = OptionDescription('ip_admin_eth0', '', [ip_admin_eth0, netmask_admin_eth0]) conf1 = await Config(interface0, session_id='conf1') await conf1.owner.set('conf1_user') await conf1.property.read_write() mix1 = await MixConfig(interface1, [conf1], session_id='mix1') await mix1.owner.set('mix1_user') mix2 = await MixConfig(interface2, [mix1], session_id='mix2') await mix2.owner.set('mix2_user') # await conf1.option('ip_admin_eth0').value.set('192.168.0.1') assert await conf1.option('ip_admin_eth0').owner.get() == 'conf1_user' await mix2.option('ip_admin_eth0').value.set('192.168.0.3') assert await mix2.option('ip_admin_eth0').owner.get() == 'mix2_user' # mix2_copy = await conf1.config.deepcopy(session_id='conf2', metaconfig_prefix='copy_') await mix2_copy.option('netmask_admin_eth0').value.set('255.255.255.255') assert await mix2_copy.option('ip_admin_eth0').value.get() == '192.168.0.3' assert await mix2_copy.option('ip_admin_eth0').owner.get() == 'mix2_user' assert await mix2_copy.option('netmask_admin_eth0').owner.get() == 'mix2_user' # mix1_copy = await mix2_copy.config('copy_mix1') await mix1_copy.option('netmask_admin_eth0').value.set('255.255.255.255') # conf2 = await mix1_copy.config('conf2') await conf2.owner.set('conf2_user') await conf2.option('netmask_admin_eth1').value.set('255.255.255.255') assert await conf2.option('netmask_admin_eth1').owner.get() == 'conf2_user' assert await conf2.option('ip_admin_eth0').value.get() == '192.168.0.1' assert await conf2.option('ip_admin_eth0').owner.get() == 'conf1_user' await delete_sessions([mix1, mix2, mix1_copy, mix2_copy]) @pytest.mark.asyncio async def test_mix_properties_mix_set_value(): ip_admin_eth0 = NetworkOption('ip_admin_eth1', "ip", multi=True, default=['192.168.1.1']) netmask_admin_eth0 = NetmaskOption('netmask_admin_eth0', "mask", multi=True, properties=('disabled',)) interface0 = OptionDescription('ip_admin_eth0', '', [ip_admin_eth0, netmask_admin_eth0]) ip_admin_eth0 = NetworkOption('ip_admin_eth0', "ip", multi=True, default=['192.168.1.1']) netmask_admin_eth0 = NetmaskOption('netmask_admin_eth1', "mask", multi=True, properties=('disabled',)) interface1 = OptionDescription('ip_admin_eth0', '', [ip_admin_eth0, netmask_admin_eth0]) ip_admin_eth0 = NetworkOption('ip_admin_eth0', "ip", multi=True, default=['192.168.1.1']) netmask_admin_eth0 = NetmaskOption('netmask_admin_eth0', "mask", multi=True, properties=('disabled',)) interface2 = OptionDescription('ip_admin_eth0', '', [ip_admin_eth0, netmask_admin_eth0]) conf1 = await Config(interface0, session_id='conf1') conf2 = await Config(interface1, session_id='conf2') await conf1.property.read_write() await conf2.property.read_write() mix = await MixConfig(interface2, [conf1, conf2]) await mix.property.read_write() newconf2 = await mix.config('conf2') assert await newconf2.value.dict() == {'ip_admin_eth0': ['192.168.1.1']} ret = await mix.value.set('netmask_admin_eth0', ['255.255.255.255'], only_config=True) assert len(ret) == 2 assert isinstance(ret[0], PropertiesOptionError) assert isinstance(ret[1], AttributeError) del ret[1] del ret[0] del ret ret = await mix.value.set('netmask_admin_eth0', ['255.255.255.255'], force_default=True) assert len(ret) == 2 assert isinstance(ret[0], AttributeError) assert isinstance(ret[1], PropertiesOptionError) del ret[1] del ret[0] del ret ret = await mix.value.set('netmask_admin_eth0', ['255.255.255.255'], force_dont_change_value=True) assert len(ret) == 3 assert isinstance(ret[0], PropertiesOptionError) assert isinstance(ret[1], AttributeError) assert isinstance(ret[2], PropertiesOptionError) del ret[2] del ret[1] del ret[0] del ret ret = await mix.value.set('netmask_admin_eth0', ['255.255.255.255'], force_default_if_same=True) assert len(ret) == 2 assert isinstance(ret[0], AttributeError) assert isinstance(ret[1], PropertiesOptionError) del ret[1] del ret[0] del ret ret = await mix.value.set('ip_admin_eth0', '255.255.255.255', only_config=True) assert len(ret) == 2 assert isinstance(ret[0], AttributeError) assert isinstance(ret[1], ValueError) del ret[1] del ret[0] del ret ret = await mix.value.set('ip_admin_eth0', '255.255.255.255', force_default=True) assert len(ret) == 2 assert isinstance(ret[0], AttributeError) assert isinstance(ret[1], ValueError) del ret[1] del ret[0] del ret ret = await mix.value.set('ip_admin_eth0', '255.255.255.255', force_dont_change_value=True) assert len(ret) == 2 assert isinstance(ret[0], AttributeError) assert isinstance(ret[1], ValueError) del ret[1] del ret[0] del ret ret = await mix.value.set('ip_admin_eth0', '255.255.255.255', force_default_if_same=True) assert len(ret) == 2 assert isinstance(ret[0], AttributeError) assert isinstance(ret[1], ValueError) del ret[1] del ret[0] del ret await delete_sessions(mix) @pytest.mark.asyncio async def test_mix_different_default(): ip_admin_eth0 = NetworkOption('ip_admin_eth0', "ip", multi=True, default=['192.168.1.1']) interface0 = OptionDescription('ip_admin_eth0', '', [ip_admin_eth0]) ip_admin_eth0 = NetworkOption('ip_admin_eth1', "ip", multi=True, default=['192.168.1.2']) interface1 = OptionDescription('ip_admin_eth0', '', [ip_admin_eth0]) ip_admin_eth0 = NetworkOption('ip_admin_eth1', "ip", multi=True, default=['192.168.1.3']) interface2 = OptionDescription('ip_admin_eth0', '', [ip_admin_eth0]) ip_admin_eth0 = NetworkOption('ip_admin_eth0', "ip", multi=True, default=['192.168.1.4']) ip_admin_eth1 = NetworkOption('ip_admin_eth1', "ip", multi=True, default=['192.168.1.5']) interface3 = OptionDescription('ip_admin_eth0', '', [ip_admin_eth0, ip_admin_eth1]) ip_admin_eth0 = NetworkOption('ip_admin_eth0', "ip", multi=True, default=['192.168.1.6']) interface4 = OptionDescription('ip_admin_eth0', '', [ip_admin_eth0]) conf1 = await Config(interface0, session_id='conf1') await conf1.property.read_write() conf2 = await Config(interface1, session_id='conf2') await conf2.property.read_write() mix = await MixConfig(interface2, [conf1, conf2], session_id='submix1') mix = await MixConfig(interface3, [mix], session_id='submix2') mix = await MixConfig(interface4, [mix]) await mix.property.read_write() # assert await mix.value.dict() == {'ip_admin_eth0': ['192.168.1.6']} newsubmix2 = await mix.config('submix2') newsubmix1 = await mix.config('submix2.submix1') newconf1 = await mix.config('submix2.submix1.conf1') newconf2 = await mix.config('submix2.submix1.conf2') assert await newsubmix2.value.dict() == {'ip_admin_eth0': ['192.168.1.4'], 'ip_admin_eth1': ['192.168.1.5']} assert await newsubmix1.value.dict() == {'ip_admin_eth1': ['192.168.1.3']} assert await newconf2.value.dict() == {'ip_admin_eth1': ['192.168.1.2']} assert await newconf1.value.dict() == {'ip_admin_eth0': ['192.168.1.1']} # await mix.option('ip_admin_eth0').value.set(['192.168.1.7']) assert await mix.value.dict() == {'ip_admin_eth0': ['192.168.1.7']} assert await newsubmix2.value.dict() == {'ip_admin_eth0': ['192.168.1.7'], 'ip_admin_eth1': ['192.168.1.5']} assert await newsubmix1.value.dict() == {'ip_admin_eth1': ['192.168.1.3']} assert await newconf2.value.dict() == {'ip_admin_eth1': ['192.168.1.2']} assert await newconf1.value.dict() == {'ip_admin_eth0': ['192.168.1.7']} # await newsubmix2.option('ip_admin_eth0').value.set(['192.168.1.8']) assert await mix.value.dict() == {'ip_admin_eth0': ['192.168.1.7']} assert await newsubmix2.value.dict() == {'ip_admin_eth0': ['192.168.1.8'], 'ip_admin_eth1': ['192.168.1.5']} assert await newsubmix1.value.dict() == {'ip_admin_eth1': ['192.168.1.3']} assert await newconf2.value.dict() == {'ip_admin_eth1': ['192.168.1.2']} assert await newconf1.value.dict() == {'ip_admin_eth0': ['192.168.1.8']} # with pytest.raises(AttributeError): await newsubmix1.option('ip_admin_eth0').value.set(['192.168.1.9']) assert await mix.value.dict() == {'ip_admin_eth0': ['192.168.1.7']} assert await newsubmix2.value.dict() == {'ip_admin_eth0': ['192.168.1.8'], 'ip_admin_eth1': ['192.168.1.5']} assert await newsubmix1.value.dict() == {'ip_admin_eth1': ['192.168.1.3']} assert await newconf2.value.dict() == {'ip_admin_eth1': ['192.168.1.2']} assert await newconf1.value.dict() == {'ip_admin_eth0': ['192.168.1.8']} # with pytest.raises(AttributeError): await newconf2.option('ip_admin_eth0').value.set(['192.168.1.9']) assert await mix.value.dict() == {'ip_admin_eth0': ['192.168.1.7']} assert await newsubmix2.value.dict() == {'ip_admin_eth0': ['192.168.1.8'], 'ip_admin_eth1': ['192.168.1.5']} assert await newsubmix1.value.dict() == {'ip_admin_eth1': ['192.168.1.3']} assert await newconf2.value.dict() == {'ip_admin_eth1': ['192.168.1.2']} assert await newconf1.value.dict() == {'ip_admin_eth0': ['192.168.1.8']} # await newconf1.option('ip_admin_eth0').value.set(['192.168.1.9']) assert await mix.value.dict() == {'ip_admin_eth0': ['192.168.1.7']} assert await newsubmix2.value.dict() == {'ip_admin_eth0': ['192.168.1.8'], 'ip_admin_eth1': ['192.168.1.5']} assert await newsubmix1.value.dict() == {'ip_admin_eth1': ['192.168.1.3']} assert await newconf2.value.dict() == {'ip_admin_eth1': ['192.168.1.2']} assert await newconf1.value.dict() == {'ip_admin_eth0': ['192.168.1.9']} # with pytest.raises(AttributeError): await mix.option('ip_admin_eth1').value.set(['192.168.1.10']) assert await mix.value.dict() == {'ip_admin_eth0': ['192.168.1.7']} assert await newsubmix2.value.dict() == {'ip_admin_eth0': ['192.168.1.8'], 'ip_admin_eth1': ['192.168.1.5']} assert await newsubmix1.value.dict() == {'ip_admin_eth1': ['192.168.1.3']} assert await newconf2.value.dict() == {'ip_admin_eth1': ['192.168.1.2']} assert await newconf1.value.dict() == {'ip_admin_eth0': ['192.168.1.9']} # await newsubmix2.option('ip_admin_eth1').value.set(['192.168.1.10']) assert await mix.value.dict() == {'ip_admin_eth0': ['192.168.1.7']} assert await newsubmix2.value.dict() == {'ip_admin_eth0': ['192.168.1.8'], 'ip_admin_eth1': ['192.168.1.10']} assert await newsubmix1.value.dict() == {'ip_admin_eth1': ['192.168.1.10']} assert await newconf2.value.dict() == {'ip_admin_eth1': ['192.168.1.10']} assert await newconf1.value.dict() == {'ip_admin_eth0': ['192.168.1.9']} # await newsubmix1.option('ip_admin_eth1').value.set(['192.168.1.11']) assert await mix.value.dict() == {'ip_admin_eth0': ['192.168.1.7']} assert await newsubmix2.value.dict() == {'ip_admin_eth0': ['192.168.1.8'], 'ip_admin_eth1': ['192.168.1.10']} assert await newsubmix1.value.dict() == {'ip_admin_eth1': ['192.168.1.11']} assert await newconf2.value.dict() == {'ip_admin_eth1': ['192.168.1.11']} assert await newconf1.value.dict() == {'ip_admin_eth0': ['192.168.1.9']} # await newconf2.option('ip_admin_eth1').value.set(['192.168.1.12']) assert await mix.value.dict() == {'ip_admin_eth0': ['192.168.1.7']} assert await newsubmix2.value.dict() == {'ip_admin_eth0': ['192.168.1.8'], 'ip_admin_eth1': ['192.168.1.10']} assert await newsubmix1.value.dict() == {'ip_admin_eth1': ['192.168.1.11']} assert await newconf2.value.dict() == {'ip_admin_eth1': ['192.168.1.12']} assert await newconf1.value.dict() == {'ip_admin_eth0': ['192.168.1.9']} # with pytest.raises(AttributeError): await newconf1.option('ip_admin_eth1').value.set(['192.168.1.13']) assert await mix.value.dict() == {'ip_admin_eth0': ['192.168.1.7']} assert await newsubmix2.value.dict() == {'ip_admin_eth0': ['192.168.1.8'], 'ip_admin_eth1': ['192.168.1.10']} assert await newsubmix1.value.dict() == {'ip_admin_eth1': ['192.168.1.11']} assert await newconf2.value.dict() == {'ip_admin_eth1': ['192.168.1.12']} assert await newconf1.value.dict() == {'ip_admin_eth0': ['192.168.1.9']} await delete_sessions(mix) @pytest.mark.asyncio async def test_mix_different_default_reset(): ip_admin_eth0 = NetworkOption('ip_admin_eth0', "ip", multi=True, default=['192.168.1.1']) interface0 = OptionDescription('ip_admin_eth0', '', [ip_admin_eth0]) ip_admin_eth0 = NetworkOption('ip_admin_eth1', "ip", multi=True, default=['192.168.1.2']) interface1 = OptionDescription('ip_admin_eth0', '', [ip_admin_eth0]) ip_admin_eth0 = NetworkOption('ip_admin_eth1', "ip", multi=True, default=['192.168.1.3']) interface2 = OptionDescription('ip_admin_eth0', '', [ip_admin_eth0]) ip_admin_eth0 = NetworkOption('ip_admin_eth0', "ip", multi=True, default=['192.168.1.4']) ip_admin_eth1 = NetworkOption('ip_admin_eth1', "ip", multi=True, default=['192.168.1.5']) interface3 = OptionDescription('ip_admin_eth0', '', [ip_admin_eth0, ip_admin_eth1]) ip_admin_eth0 = NetworkOption('ip_admin_eth0', "ip", multi=True, default=['192.168.1.6']) interface4 = OptionDescription('ip_admin_eth0', '', [ip_admin_eth0]) conf1 = await Config(interface0, session_id='conf1') await conf1.property.read_write() conf2 = await Config(interface1, session_id='conf2') await conf2.property.read_write() mix = await MixConfig(interface2, [conf1, conf2], session_id='submix1') mix = await MixConfig(interface3, [mix], session_id='submix2') mix = await MixConfig(interface4, [mix]) await mix.property.read_write() # await mix.option('ip_admin_eth0').value.set(['192.168.1.7']) submix2 = await mix.config('submix2') submix1 = await mix.config('submix2.submix1') conf1 = await mix.config('submix2.submix1.conf1') conf2 = await mix.config('submix2.submix1.conf2') await submix2.option('ip_admin_eth0').value.set(['192.168.1.8']) await submix2.option('ip_admin_eth1').value.set(['192.168.1.10']) await submix1.option('ip_admin_eth1').value.set(['192.168.1.11']) await conf2.option('ip_admin_eth1').value.set(['192.168.1.12']) await conf1.option('ip_admin_eth0').value.set(['192.168.1.9']) assert await mix.value.dict() == {'ip_admin_eth0': ['192.168.1.7']} assert await submix2.value.dict() == {'ip_admin_eth0': ['192.168.1.8'], 'ip_admin_eth1': ['192.168.1.10']} assert await submix1.value.dict() == {'ip_admin_eth1': ['192.168.1.11']} assert await conf2.value.dict() == {'ip_admin_eth1': ['192.168.1.12']} assert await conf1.value.dict() == {'ip_admin_eth0': ['192.168.1.9']} # await mix.value.reset('ip_admin_eth0') assert await mix.value.dict() == {'ip_admin_eth0': ['192.168.1.6']} assert await submix2.value.dict() == {'ip_admin_eth0': ['192.168.1.4'], 'ip_admin_eth1': ['192.168.1.10']} assert await submix1.value.dict() == {'ip_admin_eth1': ['192.168.1.11']} assert await conf2.value.dict() == {'ip_admin_eth1': ['192.168.1.12']} assert await conf1.value.dict() == {'ip_admin_eth0': ['192.168.1.1']} # await mix.value.reset('ip_admin_eth1') assert await mix.value.dict() == {'ip_admin_eth0': ['192.168.1.6']} assert await submix2.value.dict() == {'ip_admin_eth0': ['192.168.1.4'], 'ip_admin_eth1': ['192.168.1.5']} assert await submix1.value.dict() == {'ip_admin_eth1': ['192.168.1.3']} assert await conf2.value.dict() == {'ip_admin_eth1': ['192.168.1.2']} assert await conf1.value.dict() == {'ip_admin_eth0': ['192.168.1.1']} await delete_sessions(mix) @pytest.mark.asyncio async def test_mix_pop_config(): od = make_description() config1 = await Config(od, session_id='config1') config2 = await Config(od, session_id='config2') mix = await MixConfig(od, [config1, config2]) await mix.option('od1.i1').value.set(2) # assert len(list(await mix.config.list())) == 2 newconfig1 = await mix.config('config1') assert await newconfig1.value.dict() == {'od1.i1': 2, 'od1.i2': 1, 'od1.i3': None, 'od1.i4': 2, 'od1.i5': [2], 'od1.i6': None} newconf1 = await mix.config.pop('config1') try: await mix.config('config1') except ConfigError: pass else: raise Exception('must raise') assert await newconf1.value.dict() == {'od1.i1': None, 'od1.i2': 1, 'od1.i3': None, 'od1.i4': 2, 'od1.i5': [2], 'od1.i6': None} # assert len(list(await mix.config.list())) == 1 with pytest.raises(ConfigError): await mix.config.pop('newconf1') await delete_sessions([mix, newconf1]) @pytest.mark.asyncio async def test_mix_add_config(): od = make_description() config1 = await Config(od, session_id='config1') config2 = await Config(od, session_id='config2') mix = await MixConfig(od, [config1, config2]) await mix.option('od1.i1').value.set(2) # assert len(list(await mix.config.list())) == 2 config = await Config(od, session_id='new') assert await config.value.dict() == {'od1.i1': None, 'od1.i2': 1, 'od1.i3': None, 'od1.i4': 2, 'od1.i5': [2], 'od1.i6': None} await mix.config.add(config) # assert len(list(await mix.config.list())) == 3 assert await config.value.dict() == {'od1.i1': 2, 'od1.i2': 1, 'od1.i3': None, 'od1.i4': 2, 'od1.i5': [2], 'od1.i6': None} # with pytest.raises(ConflictError): await mix.config.add(config) await delete_sessions(mix) @pytest.mark.asyncio async def test_mix_add_config_readd(): od = make_description() mix = await MixConfig(od, []) mix2 = await MixConfig(od, []) # config = await Config(od, session_id='new') await mix.config.add(config) await mix2.config.add(config) assert len(list(await config.config.parents())) == 2 await delete_sessions([mix, mix2]) @pytest.mark.asyncio async def test_mix_new_config_readd(): od = make_description() mix = await MixConfig(od, []) assert len(list(await mix.config.list())) == 0 mix2 = await mix.config.new('mix2') assert len(list(await mix.config.list())) == 1 await delete_sessions([mix, mix2]) @pytest.mark.asyncio async def test_meta_new_mixconfig(): od = make_description() cfg = await Config(od, session_id='cfg1') meta = await MetaConfig([cfg]) mix = await meta.config.new('mixconfig', type="mixconfig") assert isinstance(mix, MixConfig) await delete_sessions(meta) @pytest.mark.asyncio async def test_meta_get_mixconfig(): od = make_description() cfg = await Config(od, session_id='conf1') meta = await MetaConfig([cfg]) await meta.config.new('mixconfig', type="mixconfig") assert isinstance(await meta.config.get('mixconfig'), MixConfig) with pytest.raises(ConfigError): await meta.config.get('unknown') newmix = await meta.config.get('mixconfig') await newmix.config.add(await MixConfig(od, [], session_id='mixconfig2')) assert isinstance(await newmix.config.get('mixconfig2'), MixConfig) await delete_sessions(meta)