from importlib import import_module import pytest try: from tiramisu3 import list_sessions, delete_session as _delete_session except: from tiramisu import list_sessions, delete_session as _delete_session from .storage import STORAGE from risotto import services from risotto.context import Context #from risotto.services import load_services from risotto.dispatcher import dispatcher SOURCE_NAME = 'test' SERVERMODEL_NAME = 'sm1' def setup_module(module): # load_services(['config'], # validate=False) services.link_to_dispatcher(dispatcher, limit_services=['setting'], validate=False) config_module = dispatcher.get_service('config') config_module.save_storage = STORAGE #dispatcher.set_module('server', import_module(f'.server', 'fake_services'), True) #dispatcher.set_module('servermodel', import_module(f'.servermodel', 'fake_services'), True) def setup_function(function): config_module = dispatcher.get_service('config') config_module.server = {} config_module.servermodel = {} async def delete_session(): # delete all sessions config_module = dispatcher.get_service('config') for session in await list_sessions(storage=config_module.save_storage): await _delete_session(storage=config_module.save_storage, session_id=session) def get_fake_context(module_name): risotto_context = Context() risotto_context.username = 'test' risotto_context.paths.append(f'{module_name}.on_join') risotto_context.type = None return risotto_context async def onjoin(source=True): config_module = dispatcher.get_service('config') assert config_module.servermodel == {} assert config_module.server == {} await delete_session() # #config_module.cache_root_path = 'tests/data' await dispatcher.load() await dispatcher.on_join(truncate=True) if source: fake_context = get_fake_context('config') await dispatcher.call('v1', 'setting.source.create', fake_context, source_name=SOURCE_NAME, source_directory='tests/data', ) INTERNAL_SOURCE = {'source_name': 'internal', 'source_directory': '/srv/risotto/seed/internal'} TEST_SOURCE = {'source_name': 'test', 'source_directory': 'tests/data'} ############################################################################################################################## # Source / Release ############################################################################################################################## @pytest.mark.asyncio async def test_source_on_join(): # onjoin must create internal source sources = [INTERNAL_SOURCE] await onjoin(False) fake_context = get_fake_context('config') assert await dispatcher.call('v1', 'setting.source.list', fake_context, ) == sources await delete_session() @pytest.mark.asyncio async def test_source_create(): sources = [INTERNAL_SOURCE, TEST_SOURCE] await onjoin() config_module = dispatcher.get_service('config') assert list(config_module.servermodel.keys()) == ['last_base'] assert list(config_module.server) == [] fake_context = get_fake_context('config') assert await dispatcher.call('v1', 'setting.source.list', fake_context, ) == sources await delete_session() @pytest.mark.asyncio async def test_source_describe(): await onjoin() fake_context = get_fake_context('config') assert await dispatcher.call('v1', 'setting.source.describe', fake_context, source_name='internal', ) == INTERNAL_SOURCE assert await dispatcher.call('v1', 'setting.source.describe', fake_context, source_name=SOURCE_NAME, ) == TEST_SOURCE await delete_session() @pytest.mark.asyncio async def test_release_internal_list(): releases = [{'release_distribution': 'last', 'release_name': 'none', 'source_name': 'internal'}] await onjoin() fake_context = get_fake_context('config') assert await dispatcher.call('v1', 'setting.source.release.list', fake_context, source_name='internal', ) == releases await delete_session() @pytest.mark.asyncio async def test_release_list(): releases = [{'release_distribution': 'last', 'release_name': '1', 'source_name': 'test'}] await onjoin() fake_context = get_fake_context('config') assert await dispatcher.call('v1', 'setting.source.release.list', fake_context, source_name='test', ) == releases await delete_session() @pytest.mark.asyncio async def test_release_describe(): await onjoin() fake_context = get_fake_context('config') assert await dispatcher.call('v1', 'setting.source.release.describe', fake_context, source_name='internal', release_distribution='last', ) == {'release_distribution': 'last', 'release_name': 'none', 'source_name': 'internal'} assert await dispatcher.call('v1', 'setting.source.release.describe', fake_context, source_name='test', release_distribution='last', ) == {'release_distribution': 'last', 'release_name': '1', 'source_name': 'test'} await delete_session() ############################################################################################################################## # Servermodel ############################################################################################################################## async def create_servermodel(name=SERVERMODEL_NAME, parents_name=['base'], ): fake_context = get_fake_context('config') await dispatcher.call('v1', 'setting.servermodel.create', fake_context, servermodel_name=name, servermodel_description='servermodel 1', parents_name=parents_name, source_name=SOURCE_NAME, release_distribution='last', ) @pytest.mark.asyncio async def test_servermodel_created(): await onjoin() config_module = dispatcher.get_service('config') # assert list(config_module.servermodel) == ['last_base'] await create_servermodel() assert list(config_module.servermodel) == ['last_base', 'last_sm1'] assert not list(await config_module.servermodel['last_base'].config.parents()) assert len(list(await config_module.servermodel['last_sm1'].config.parents())) == 1 await delete_session() # # #@pytest.mark.asyncio #async def test_servermodel_herited_created(): # config_module = dispatcher.get_service('config') # fake_context = get_fake_context('config') # config_module.cache_root_path = 'tests/data' # await config_module.on_join(fake_context) # # # assert list(config_module.servermodel) == [1, 2] # await dispatcher.publish('v1', # 'servermodel.created', # fake_context, # servermodel_id=3, # servermodel_name='name3', # release_id=1, # servermodel_description='name3', # servermodel_parents_id=[1]) # assert list(config_module.servermodel) == [1, 2, 3] # assert len(list(await config_module.servermodel[3].config.parents())) == 1 # await delete_session() # # #@pytest.mark.asyncio #async def test_servermodel_multi_herited_created(): # config_module = dispatcher.get_service('config') # fake_context = get_fake_context('config') # config_module.cache_root_path = 'tests/data' # await config_module.on_join(fake_context) # # # assert list(config_module.servermodel) == [1, 2] # await dispatcher.publish('v1', # 'servermodel.created', # fake_context, # servermodel_id=3, # servermodel_name='name3', # release_id=1, # servermodel_description='name3', # servermodel_parents_id=[1, 2]) # assert list(config_module.servermodel) == [1, 2, 3] # assert len(list(await config_module.servermodel[3].config.parents())) == 2 # await delete_session() # # ##@pytest.mark.asyncio ##async def test_servermodel_updated_not_exists(): ## config_module = dispatcher.get_service('config') ## fake_context = get_fake_context('config') ## config_module.cache_root_path = 'tests/data' ## await config_module.on_join(fake_context) ## # ## assert list(config_module.servermodel) == [1, 2] ## await dispatcher.publish('v1', ## 'servermodel.updated', ## fake_context, ## servermodel_id=3, ## servermodel_name='name3', ## release_id=1, ## servermodel_description='name3', ## servermodel_parents_id=[1, 2]) ## assert list(config_module.servermodel) == [1, 2, 3] ## assert len(list(await config_module.servermodel[3].config.parents())) == 2 ## await delete_session() ## ## ## @pytest.mark.asyncio ## async def test_servermodel_updated1(): ## config_module = dispatcher.get_service('config') ## fake_context = get_fake_context('config') ## config_module.cache_root_path = 'tests/data' ## await config_module.on_join(fake_context) ## # ## assert list(config_module.servermodel) == [1, 2] ## metaconfig1 = config_module.servermodel[1] ## metaconfig2 = config_module.servermodel[2] ## mixconfig1 = (await metaconfig1.config.list())[0] ## mixconfig2 = (await metaconfig2.config.list())[0] ## assert len(list(await metaconfig1.config.parents())) == 0 ## assert len(list(await metaconfig2.config.parents())) == 1 ## assert len(list(await mixconfig1.config.list())) == 1 ## assert len(list(await mixconfig2.config.list())) == 0 ## # ## await dispatcher.publish('v1', ## 'servermodel.updated', ## fake_context, ## servermodel_id=1, ## servermodel_name='name1-1', ## release_id=1, ## servermodel_description='name1-1') ## assert set(config_module.servermodel) == {1, 2} ## assert config_module.servermodel[1].information.get('servermodel_name') == 'name1-1' ## assert metaconfig1 != config_module.servermodel[1] ## assert metaconfig2 == config_module.servermodel[2] ## metaconfig1 = config_module.servermodel[1] ## assert mixconfig1 != next(metaconfig1.config.list()) ## mixconfig1 = next(metaconfig1.config.list()) ## # ## assert len(list(await metaconfig1.config.parents())) == 0 ## assert len(list(await metaconfig2.config.parents())) == 1 ## assert len(list(await mixconfig1.config.list())) == 1 ## assert len(list(await mixconfig2.config.list())) == 0 ## await delete_session() ## ## ## @pytest.mark.asyncio ## async def test_servermodel_updated2(): ## config_module = dispatcher.get_service('config') ## fake_context = get_fake_context('config') ## config_module.cache_root_path = 'tests/data' ## await config_module.on_join(fake_context) ## # create a new servermodel ## assert list(config_module.servermodel) == [1, 2] ## mixconfig1 = next(config_module.servermodel[1].config.list()) ## mixconfig2 = next(config_module.servermodel[2].config.list()) ## assert len(list(mixconfig1.config.list())) == 1 ## assert len(list(mixconfig2.config.list())) == 0 ## await dispatcher.publish('v1', ## 'servermodel.created', ## fake_context, ## servermodel_id=3, ## servermodel_name='name3', ## release_id=1, ## servermodel_description='name3', ## servermodel_parents_id=[1]) ## assert list(config_module.servermodel) == [1, 2, 3] ## assert len(list(await config_module.servermodel[3].config.parents())) == 1 ## assert await config_module.servermodel[3].information.get('servermodel_name') == 'name3' ## assert len(list(await mixconfig1.config.list())) == 2 ## assert len(list(await mixconfig2.config.list())) == 0 ## # ## await dispatcher.publish('v1', ## 'servermodel.updated', ## fake_context, ## servermodel_id=3, ## servermodel_name='name3-1', ## release_id=1, ## servermodel_description='name3-1', ## servermodel_parents_id=[1, 2]) ## assert list(config_module.servermodel) == [1, 2, 3] ## assert config_module.servermodel[3].information.get('servermodel_name') == 'name3-1' ## assert len(list(mixconfig1.config.list())) == 2 ## assert len(list(mixconfig2.config.list())) == 1 ## await delete_session() ## ## ## @pytest.mark.asyncio ## async def test_servermodel_updated_config(): ## config_module = dispatcher.get_service('config') ## fake_context = get_fake_context('config') ## config_module.cache_root_path = 'tests/data' ## await config_module.on_join(fake_context) ## # ## config_module.servermodel[1].property.read_write() ## assert config_module.servermodel[1].option('configuration.general.mode_conteneur_actif').value.get() == 'non' ## config_module.servermodel[1].option('configuration.general.mode_conteneur_actif').value.set('oui') ## assert config_module.servermodel[1].option('configuration.general.mode_conteneur_actif').value.get() == 'oui' ## # ## await dispatcher.publish('v1', ## 'servermodel.updated', ## fake_context, ## servermodel_id=1, ## servermodel_name='name1-1', ## release_id=1, ## servermodel_description='name1-1') ## assert config_module.servermodel[1].option('configuration.general.mode_conteneur_actif').value.get() == 'oui' ## await delete_session() ############################################################################################################################## # Server ############################################################################################################################## @pytest.mark.asyncio async def test_server_created_base(): await onjoin() config_module = dispatcher.get_service('config') fake_context = get_fake_context('config') # assert list(config_module.server) == [] await dispatcher.on_join(truncate=True) server_name = 'dns.test.lan' await dispatcher.publish('v1', 'infra.server.created', fake_context, server_name=server_name, server_description='description_created', servermodel_name='base', release_distribution='last', site_name='site_1', zones_name=['zones'], zones_ip=['1.1.1.1'], ) assert list(config_module.server) == [server_name] assert set(config_module.server[server_name]) == {'server', 'server_to_deploy', 'funcs_file'} assert config_module.server[server_name]['funcs_file'] == '/var/cache/risotto/servermodel/last/base/funcs.py' await delete_session() @pytest.mark.asyncio async def test_server_created_own_sm(): await onjoin() config_module = dispatcher.get_service('config') fake_context = get_fake_context('config') await create_servermodel() # assert list(config_module.server) == [] await dispatcher.on_join(truncate=True) server_name = 'dns.test.lan' await dispatcher.publish('v1', 'infra.server.created', fake_context, server_name=server_name, server_description='description_created', servermodel_name=SERVERMODEL_NAME, source_name=SOURCE_NAME, release_distribution='last', site_name='site_1', zones_name=['zones'], zones_ip=['1.1.1.1'], ) assert list(config_module.server) == [server_name] assert set(config_module.server[server_name]) == {'server', 'server_to_deploy', 'funcs_file'} assert config_module.server[server_name]['funcs_file'] == '/var/cache/risotto/servermodel/last/sm1/funcs.py' await delete_session() #@pytest.mark.asyncio #async def test_server_deleted(): # config_module = dispatcher.get_service('config') # config_module.cache_root_path = 'tests/data' # await config_module.on_join(fake_context) # # # assert list(config_module.server) == [3] # await dispatcher.publish('v1', # 'server.created', # fake_context, # server_id=4, # server_name='name4', # server_description='description4', # server_servermodel_id=2) # assert list(config_module.server) == [3, 4] # await dispatcher.publish('v1', # 'server.deleted', # fake_context, # server_id=4) # assert list(config_module.server) == [3] # await delete_session() @pytest.mark.asyncio async def test_server_configuration_get(): await onjoin() config_module = dispatcher.get_service('config') fake_context = get_fake_context('config') await create_servermodel() await dispatcher.on_join(truncate=True) server_name = 'dns.test.lan' await dispatcher.publish('v1', 'infra.server.created', fake_context, server_name=server_name, server_description='description_created', servermodel_name=SERVERMODEL_NAME, source_name=SOURCE_NAME, release_distribution='last', site_name='site_1', zones_name=['zones'], zones_ip=['1.1.1.1'], ) # await config_module.server[server_name]['server'].property.read_write() assert await config_module.server[server_name]['server'].option('configuration.general.number_of_interfaces').value.get() == 1 await config_module.server[server_name]['server'].option('configuration.general.number_of_interfaces').value.set(2) assert await config_module.server[server_name]['server'].option('configuration.general.number_of_interfaces').value.get() == 2 assert await config_module.server[server_name]['server_to_deploy'].option('configuration.general.number_of_interfaces').value.get() == 1 # configuration = {'server_name': server_name, 'deployed': False, 'configuration': {'configuration.general.number_of_interfaces': 1, 'configuration.general.interfaces_list': [0], 'configuration.interface_0.domain_name_eth0': 'dns.test.lan' } } values = await dispatcher.call('v1', 'setting.config.configuration.server.get', fake_context, server_name=server_name, deployed=False, ) assert values == configuration # await delete_session() @pytest.mark.asyncio async def test_server_configuration_deployed(): await onjoin() config_module = dispatcher.get_service('config') fake_context = get_fake_context('config') await create_servermodel() await dispatcher.on_join(truncate=True) server_name = 'dns.test.lan' await dispatcher.publish('v1', 'infra.server.created', fake_context, server_name=server_name, server_description='description_created', servermodel_name=SERVERMODEL_NAME, source_name=SOURCE_NAME, release_distribution='last', site_name='site_1', zones_name=['zones'], zones_ip=['1.1.1.1'], ) # await config_module.server[server_name]['server'].property.read_write() assert await config_module.server[server_name]['server'].option('configuration.general.number_of_interfaces').value.get() == 1 await config_module.server[server_name]['server'].option('configuration.general.number_of_interfaces').value.set(2) assert await config_module.server[server_name]['server'].option('configuration.general.number_of_interfaces').value.get() == 2 assert await config_module.server[server_name]['server_to_deploy'].option('configuration.general.number_of_interfaces').value.get() == 1 # configuration = {'server_name': server_name, 'deployed': False, 'configuration': {'configuration.general.number_of_interfaces': 1, 'configuration.general.interfaces_list': [0], 'configuration.interface_0.domain_name_eth0': 'dns.test.lan' } } try: await dispatcher.call('v1', 'setting.config.configuration.server.get', fake_context, server_name=server_name, ) except: pass else: raise Exception('should raise propertyerror') values = await dispatcher.call('v1', 'setting.config.configuration.server.deploy', fake_context, server_name=server_name, ) assert values == {'server_name': 'dns.test.lan', 'deployed': True} await dispatcher.call('v1', 'setting.config.configuration.server.get', fake_context, server_name=server_name, ) # await delete_session()