from importlib import import_module import pytest try: from tiramisu3 import list_sessions, delete_session as _delete_session except: from tiramisu import list_sessions, delete_session as _delete_session from .storage import STORAGE from risotto import services from risotto.context import Context #from risotto.services import load_services from risotto.dispatcher import dispatcher SOURCE_NAME = 'test' SERVERMODEL_NAME = 'sm1' def setup_module(module): # load_services(['config'], # validate=False) services.link_to_dispatcher(dispatcher, limit_services=['setting'], validate=False) config_module = dispatcher.get_service('config') config_module.save_storage = STORAGE #dispatcher.set_module('server', import_module(f'.server', 'fake_services'), True) #dispatcher.set_module('servermodel', import_module(f'.servermodel', 'fake_services'), True) def setup_function(function): config_module = dispatcher.get_service('config') config_module.server = {} config_module.servermodel = {} async def delete_session(): # delete all sessions config_module = dispatcher.get_service('config') for session in await list_sessions(storage=config_module.save_storage): await _delete_session(storage=config_module.save_storage, session_id=session) def get_fake_context(module_name): risotto_context = Context() risotto_context.username = 'test' risotto_context.paths.append(f'{module_name}.on_join') risotto_context.type = None return risotto_context async def onjoin(source=True): config_module = dispatcher.get_service('config') assert config_module.servermodel == {} assert config_module.server == {} # #config_module.cache_root_path = 'tests/data' await dispatcher.load() await dispatcher.on_join(truncate=True) if source: fake_context = get_fake_context('config') await dispatcher.call('v1', 'setting.source.create', fake_context, source_name=SOURCE_NAME, source_directory='tests/data', ) @pytest.mark.asyncio async def test_on_join(): await onjoin(False) await delete_session() @pytest.mark.asyncio async def test_source_create(): await onjoin() config_module = dispatcher.get_service('config') assert list(config_module.servermodel.keys()) == ['last_base'] assert list(config_module.server) == [] await delete_session() # FIXME {source|release}.list {source|release}.describe {source|release}.delete, ... async def create_servermodel(name=SERVERMODEL_NAME, parents_name=['base'], ): fake_context = get_fake_context('config') await dispatcher.call('v1', 'setting.servermodel.create', fake_context, servermodel_name=name, servermodel_description='servermodel 1', parents_name=parents_name, source_name=SOURCE_NAME, release_distribution='last', ) @pytest.mark.asyncio async def test_servermodel_created(): await onjoin() config_module = dispatcher.get_service('config') # assert list(config_module.servermodel) == ['last_base'] await create_servermodel() assert list(config_module.servermodel) == ['last_base', 'last_sm1'] assert not list(await config_module.servermodel['last_base'].config.parents()) assert len(list(await config_module.servermodel['last_sm1'].config.parents())) == 1 await delete_session() @pytest.mark.asyncio async def test_servermodel_created(): await onjoin() config_module = dispatcher.get_service('config') fake_context = get_fake_context('config') # assert list(config_module.servermodel) == ['last_base'] await dispatcher.call('v1', 'setting.servermodel.create', fake_context, servermodel_name='sm1', servermodel_description='servermodel 1', parents_name=['base'], source_name=SOURCE_NAME, release_distribution='last', ) assert list(config_module.servermodel) == ['last_base', 'last_sm1'] assert not list(await config_module.servermodel['last_base'].config.parents()) assert len(list(await config_module.servermodel['last_sm1'].config.parents())) == 1 await delete_session() @pytest.mark.asyncio async def test_server_created_base(): await onjoin() config_module = dispatcher.get_service('config') fake_context = get_fake_context('config') # assert list(config_module.server) == [] await dispatcher.on_join(truncate=True) server_name = 'dns.test.lan' #FIXME pas de release ?? await dispatcher.publish('v1', 'infra.server.created', fake_context, server_name=server_name, server_description='description_created', servermodel_name='base', release_distribution='last', site_name='site_1', zones_name=['zones'], zones_ip=['1.1.1.1'], ) assert list(config_module.server) == [server_name] assert set(config_module.server[server_name]) == {'server', 'server_to_deploy', 'funcs_file'} assert config_module.server[server_name]['funcs_file'] == '/var/cache/risotto/servermodel/last/base/funcs.py' await delete_session() #@pytest.mark.asyncio #async def test_server_deleted(): # config_module = dispatcher.get_service('config') # config_module.cache_root_path = 'tests/data' # await config_module.on_join(fake_context) # # # assert list(config_module.server) == [3] # await dispatcher.publish('v1', # 'server.created', # fake_context, # server_id=4, # server_name='name4', # server_description='description4', # server_servermodel_id=2) # assert list(config_module.server) == [3, 4] # await dispatcher.publish('v1', # 'server.deleted', # fake_context, # server_id=4) # assert list(config_module.server) == [3] # await delete_session() # # #@pytest.mark.asyncio #async def test_servermodel_created(): # config_module = dispatcher.get_service('config') # fake_context = get_fake_context('config') # config_module.cache_root_path = 'tests/data' # await config_module.on_join(fake_context) # # # assert list(config_module.servermodel) == [1, 2] # servermodel = {'servermodeid': 3, # 'servermodelname': 'name3'} # await dispatcher.publish('v1', # 'servermodel.created', # fake_context, # servermodel_id=3, # servermodel_description='name3', # release_id=1, # servermodel_name='name3') # assert list(config_module.servermodel) == [1, 2, 3] # assert not list(await config_module.servermodel[3].config.parents()) # await delete_session() # # #@pytest.mark.asyncio #async def test_servermodel_herited_created(): # config_module = dispatcher.get_service('config') # fake_context = get_fake_context('config') # config_module.cache_root_path = 'tests/data' # await config_module.on_join(fake_context) # # # assert list(config_module.servermodel) == [1, 2] # await dispatcher.publish('v1', # 'servermodel.created', # fake_context, # servermodel_id=3, # servermodel_name='name3', # release_id=1, # servermodel_description='name3', # servermodel_parents_id=[1]) # assert list(config_module.servermodel) == [1, 2, 3] # assert len(list(await config_module.servermodel[3].config.parents())) == 1 # await delete_session() # # #@pytest.mark.asyncio #async def test_servermodel_multi_herited_created(): # config_module = dispatcher.get_service('config') # fake_context = get_fake_context('config') # config_module.cache_root_path = 'tests/data' # await config_module.on_join(fake_context) # # # assert list(config_module.servermodel) == [1, 2] # await dispatcher.publish('v1', # 'servermodel.created', # fake_context, # servermodel_id=3, # servermodel_name='name3', # release_id=1, # servermodel_description='name3', # servermodel_parents_id=[1, 2]) # assert list(config_module.servermodel) == [1, 2, 3] # assert len(list(await config_module.servermodel[3].config.parents())) == 2 # await delete_session() # # ##@pytest.mark.asyncio ##async def test_servermodel_updated_not_exists(): ## config_module = dispatcher.get_service('config') ## fake_context = get_fake_context('config') ## config_module.cache_root_path = 'tests/data' ## await config_module.on_join(fake_context) ## # ## assert list(config_module.servermodel) == [1, 2] ## await dispatcher.publish('v1', ## 'servermodel.updated', ## fake_context, ## servermodel_id=3, ## servermodel_name='name3', ## release_id=1, ## servermodel_description='name3', ## servermodel_parents_id=[1, 2]) ## assert list(config_module.servermodel) == [1, 2, 3] ## assert len(list(await config_module.servermodel[3].config.parents())) == 2 ## await delete_session() ## ## ## @pytest.mark.asyncio ## async def test_servermodel_updated1(): ## config_module = dispatcher.get_service('config') ## fake_context = get_fake_context('config') ## config_module.cache_root_path = 'tests/data' ## await config_module.on_join(fake_context) ## # ## assert list(config_module.servermodel) == [1, 2] ## metaconfig1 = config_module.servermodel[1] ## metaconfig2 = config_module.servermodel[2] ## mixconfig1 = (await metaconfig1.config.list())[0] ## mixconfig2 = (await metaconfig2.config.list())[0] ## assert len(list(await metaconfig1.config.parents())) == 0 ## assert len(list(await metaconfig2.config.parents())) == 1 ## assert len(list(await mixconfig1.config.list())) == 1 ## assert len(list(await mixconfig2.config.list())) == 0 ## # ## await dispatcher.publish('v1', ## 'servermodel.updated', ## fake_context, ## servermodel_id=1, ## servermodel_name='name1-1', ## release_id=1, ## servermodel_description='name1-1') ## assert set(config_module.servermodel) == {1, 2} ## assert config_module.servermodel[1].information.get('servermodel_name') == 'name1-1' ## assert metaconfig1 != config_module.servermodel[1] ## assert metaconfig2 == config_module.servermodel[2] ## metaconfig1 = config_module.servermodel[1] ## assert mixconfig1 != next(metaconfig1.config.list()) ## mixconfig1 = next(metaconfig1.config.list()) ## # ## assert len(list(await metaconfig1.config.parents())) == 0 ## assert len(list(await metaconfig2.config.parents())) == 1 ## assert len(list(await mixconfig1.config.list())) == 1 ## assert len(list(await mixconfig2.config.list())) == 0 ## await delete_session() ## ## ## @pytest.mark.asyncio ## async def test_servermodel_updated2(): ## config_module = dispatcher.get_service('config') ## fake_context = get_fake_context('config') ## config_module.cache_root_path = 'tests/data' ## await config_module.on_join(fake_context) ## # create a new servermodel ## assert list(config_module.servermodel) == [1, 2] ## mixconfig1 = next(config_module.servermodel[1].config.list()) ## mixconfig2 = next(config_module.servermodel[2].config.list()) ## assert len(list(mixconfig1.config.list())) == 1 ## assert len(list(mixconfig2.config.list())) == 0 ## await dispatcher.publish('v1', ## 'servermodel.created', ## fake_context, ## servermodel_id=3, ## servermodel_name='name3', ## release_id=1, ## servermodel_description='name3', ## servermodel_parents_id=[1]) ## assert list(config_module.servermodel) == [1, 2, 3] ## assert len(list(await config_module.servermodel[3].config.parents())) == 1 ## assert await config_module.servermodel[3].information.get('servermodel_name') == 'name3' ## assert len(list(await mixconfig1.config.list())) == 2 ## assert len(list(await mixconfig2.config.list())) == 0 ## # ## await dispatcher.publish('v1', ## 'servermodel.updated', ## fake_context, ## servermodel_id=3, ## servermodel_name='name3-1', ## release_id=1, ## servermodel_description='name3-1', ## servermodel_parents_id=[1, 2]) ## assert list(config_module.servermodel) == [1, 2, 3] ## assert config_module.servermodel[3].information.get('servermodel_name') == 'name3-1' ## assert len(list(mixconfig1.config.list())) == 2 ## assert len(list(mixconfig2.config.list())) == 1 ## await delete_session() ## ## ## @pytest.mark.asyncio ## async def test_servermodel_updated_config(): ## config_module = dispatcher.get_service('config') ## fake_context = get_fake_context('config') ## config_module.cache_root_path = 'tests/data' ## await config_module.on_join(fake_context) ## # ## config_module.servermodel[1].property.read_write() ## assert config_module.servermodel[1].option('creole.general.mode_conteneur_actif').value.get() == 'non' ## config_module.servermodel[1].option('creole.general.mode_conteneur_actif').value.set('oui') ## assert config_module.servermodel[1].option('creole.general.mode_conteneur_actif').value.get() == 'oui' ## # ## await dispatcher.publish('v1', ## 'servermodel.updated', ## fake_context, ## servermodel_id=1, ## servermodel_name='name1-1', ## release_id=1, ## servermodel_description='name1-1') ## assert config_module.servermodel[1].option('creole.general.mode_conteneur_actif').value.get() == 'oui' ## await delete_session() # # #@pytest.mark.asyncio #async def test_server_configuration_get(): # config_module = dispatcher.get_service('config') # fake_context = get_fake_context('config') # config_module.cache_root_path = 'tests/data' # await config_module.on_join(fake_context) # # # await config_module.server[3]['server_to_deploy'].property.read_write() # assert await config_module.server[3]['server_to_deploy'].option('creole.general.mode_conteneur_actif').value.get() == 'non' # await config_module.server[3]['server_to_deploy'].option('creole.general.mode_conteneur_actif').value.set('oui') # assert await config_module.server[3]['server_to_deploy'].option('creole.general.mode_conteneur_actif').value.get() == 'oui' # assert await config_module.server[3]['server'].option('creole.general.mode_conteneur_actif').value.get() == 'non' # # # values = await dispatcher.call('v1', # 'config.configuration.server.get', # fake_context, # server_id=3) # configuration = {'configuration': # {'creole.general.mode_conteneur_actif': 'non', # 'creole.general.master.master': [], # 'creole.general.master.slave1': [], # 'creole.general.master.slave2': [], # 'containers.container0.files.file0.mkdir': False, # 'containers.container0.files.file0.name': '/etc/mailname', # 'containers.container0.files.file0.rm': False, # 'containers.container0.files.file0.source': 'mailname', # 'containers.container0.files.file0.activate': True}, # 'server_id': 3, # 'deployed': True} # assert values == configuration # # # values = await dispatcher.call('v1', # 'config.configuration.server.get', # fake_context, # server_id=3, # deployed=False) # configuration['configuration']['creole.general.mode_conteneur_actif'] = 'oui' # configuration['deployed'] = False # assert values == configuration # await delete_session() # # #@pytest.mark.asyncio #async def test_config_deployed(): # config_module = dispatcher.get_service('config') # fake_context = get_fake_context('config') # config_module.cache_root_path = 'tests/data' # await config_module.on_join(fake_context) # # # await config_module.server[3]['server_to_deploy'].property.read_write() # assert await config_module.server[3]['server_to_deploy'].option('creole.general.mode_conteneur_actif').value.get() == 'non' # await config_module.server[3]['server_to_deploy'].option('creole.general.mode_conteneur_actif').value.set('oui') # assert await config_module.server[3]['server_to_deploy'].option('creole.general.mode_conteneur_actif').value.get() == 'oui' # assert await config_module.server[3]['server'].option('creole.general.mode_conteneur_actif').value.get() == 'non' # values = await dispatcher.publish('v1', # 'config.configuration.server.deploy', # fake_context, # server_id=3) # assert await config_module.server[3]['server_to_deploy'].option('creole.general.mode_conteneur_actif').value.get() == 'oui' # assert await config_module.server[3]['server'].option('creole.general.mode_conteneur_actif').value.get() == 'oui' # await delete_session()