Compare commits

...

21 Commits

Author SHA1 Message Date
4fc3e74bbd Merge branch 'develop' into dist/risotto/risotto-2.8.0/develop 2021-04-13 10:31:24 +02:00
5663b2768b if not Risotto module, do not failed 2021-04-13 10:31:14 +02:00
83d74c2b06 Merge branch 'develop' into dist/risotto/risotto-2.8.0/develop 2021-04-12 15:11:54 +02:00
01834c6ba7 add check_role to dispatcher 2021-04-12 15:11:46 +02:00
6a27b002ff Merge branch 'develop' into dist/risotto/risotto-2.8.0/develop 2021-03-27 10:59:19 +01:00
8fdc34c4d3 fix 2021-03-27 10:59:10 +01:00
e2d73932c0 add sdnotify dependency 2020-11-14 19:11:57 +01:00
980a119ef9 Merge branch 'develop' into dist/risotto/risotto-2.8.0/develop 2020-11-14 19:01:34 +01:00
f623feb8a8 add systemd notifier 2020-11-14 19:01:28 +01:00
b9da2ce686 Merge branch 'develop' into dist/risotto/risotto-2.8.0/develop 2020-11-14 08:12:50 +01:00
46f8a4323b add pki informations 2020-11-14 08:12:39 +01:00
941261c830 Merge branch 'develop' into dist/risotto/risotto-2.8.0/develop 2020-10-14 18:30:13 +02:00
6c4bbb3dca add password support 2020-10-14 18:30:05 +02:00
98c77bf719 Merge branch 'develop' into dist/risotto/risotto-2.8.0/develop 2020-09-20 21:33:15 +02:00
279e3a7c4c better debugging 2020-09-20 21:33:04 +02:00
1b9d87fa53 Merge branch 'develop' into dist/risotto/risotto-2.8.0/develop 2020-09-19 10:33:34 +02:00
13c7d5816c update config 2020-09-19 10:33:27 +02:00
0e988d7040 Merge branch 'develop' into dist/risotto/risotto-2.8.0/develop 2020-09-19 09:20:04 +02:00
a89e512266 update config 2020-09-19 09:18:28 +02:00
be97d757d9 Merge branch 'develop' into dist/risotto/risotto-2.8.0/develop 2020-09-16 17:38:04 +02:00
7afccab9b1 publish use now postgresql 2020-09-16 17:37:46 +02:00
9 changed files with 300 additions and 204 deletions

6
debian/control vendored
View File

@ -9,7 +9,11 @@ Homepage: https://forge.cadoles.com/Infra/risotto
Package: python3-risotto Package: python3-risotto
Architecture: any Architecture: any
Pre-Depends: dpkg, python3, ${misc:Pre-Depends} Pre-Depends: dpkg, python3, ${misc:Pre-Depends}
Depends: ${python:Depends}, ${misc:Depends}, python3-asyncpg, python3-rougail, python3-aiohttp Depends: ${python:Depends}, ${misc:Depends},
python3-asyncpg,
python3-rougail,
python3-aiohttp,
python3-sdnotify
Description: configuration manager libraries Description: configuration manager libraries
Package: risotto Package: risotto

View File

@ -1,13 +1,16 @@
#!/usr/bin/env python3 #!/usr/bin/env python3
from sdnotify import SystemdNotifier
from asyncio import get_event_loop from asyncio import get_event_loop
from risotto import get_app from risotto import get_app
if __name__ == '__main__': if __name__ == '__main__':
notifier = SystemdNotifier()
loop = get_event_loop() loop = get_event_loop()
loop.run_until_complete(get_app(loop)) loop.run_until_complete(get_app(loop))
print('HTTP server ready')
notifier.notify("READY=1")
try: try:
print('HTTP server ready')
loop.run_forever() loop.run_forever()
except KeyboardInterrupt: except KeyboardInterrupt:
pass pass

View File

@ -1,6 +1,7 @@
from os import environ from os import environ
from os.path import isfile from os.path import isfile
from configobj import ConfigObj from configobj import ConfigObj
from uuid import uuid4
CONFIG_FILE = environ.get('CONFIG_FILE', '/etc/risotto/risotto.conf') CONFIG_FILE = environ.get('CONFIG_FILE', '/etc/risotto/risotto.conf')
@ -20,10 +21,6 @@ if 'CONFIGURATION_DIR' in environ:
CONFIGURATION_DIR = environ['CONFIGURATION_DIR'] CONFIGURATION_DIR = environ['CONFIGURATION_DIR']
else: else:
CONFIGURATION_DIR = config.get('CONFIGURATION_DIR', '/srv/risotto/configurations') CONFIGURATION_DIR = config.get('CONFIGURATION_DIR', '/srv/risotto/configurations')
if 'PROVIDER_FACTORY_CONFIG_DIR' in environ:
PROVIDER_FACTORY_CONFIG_DIR = environ['PROVIDER_FACTORY_CONFIG_DIR']
else:
PROVIDER_FACTORY_CONFIG_DIR = config.get('PROVIDER_FACTORY_CONFIG_DIR', '/srv/factory')
if 'DEFAULT_USER' in environ: if 'DEFAULT_USER' in environ:
DEFAULT_USER = environ['DEFAULT_USER'] DEFAULT_USER = environ['DEFAULT_USER']
else: else:
@ -52,6 +49,18 @@ if 'TIRAMISU_DB_USER' in environ:
TIRAMISU_DB_USER = environ['TIRAMISU_DB_USER'] TIRAMISU_DB_USER = environ['TIRAMISU_DB_USER']
else: else:
TIRAMISU_DB_USER = config.get('TIRAMISU_DB_USER', 'tiramisu') TIRAMISU_DB_USER = config.get('TIRAMISU_DB_USER', 'tiramisu')
if 'CELERYRISOTTO_DB_NAME' in environ:
CELERYRISOTTO_DB_NAME = environ['CELERYRISOTTO_DB_NAME']
else:
CELERYRISOTTO_DB_NAME = config.get('CELERYRISOTTO_DB_NAME', None)
if 'CELERYRISOTTO_DB_PASSWORD' in environ:
CELERYRISOTTO_DB_PASSWORD = environ['CELERYRISOTTO_DB_PASSWORD']
else:
CELERYRISOTTO_DB_PASSWORD = config.get('CELERYRISOTTO_DB_PASSWORD', None)
if 'CELERYRISOTTO_DB_USER' in environ:
CELERYRISOTTO_DB_USER = environ['CELERYRISOTTO_DB_USER']
else:
CELERYRISOTTO_DB_USER = config.get('CELERYRISOTTO_DB_USER', None)
if 'DB_ADDRESS' in environ: if 'DB_ADDRESS' in environ:
DB_ADDRESS = environ['DB_ADDRESS'] DB_ADDRESS = environ['DB_ADDRESS']
else: else:
@ -76,6 +85,44 @@ if 'TMP_DIR' in environ:
TMP_DIR = environ['TMP_DIR'] TMP_DIR = environ['TMP_DIR']
else: else:
TMP_DIR = config.get('TMP_DIR', '/tmp') TMP_DIR = config.get('TMP_DIR', '/tmp')
if 'IMAGE_PATH' in environ:
IMAGE_PATH = environ['IMAGE_PATH']
else:
IMAGE_PATH = config.get('IMAGE_PATH', '/tmp')
if 'PASSWORD_ADMIN_USERNAME' in environ:
PASSWORD_ADMIN_USERNAME = environ['PASSWORD_ADMIN_USERNAME']
else:
PASSWORD_ADMIN_USERNAME = config.get('PASSWORD_ADMIN_USERNAME', 'risotto')
if 'PASSWORD_ADMIN_EMAIL' in environ:
PASSWORD_ADMIN_EMAIL = environ['PASSWORD_ADMIN_EMAIL']
else:
# this parameter is mandatory
PASSWORD_ADMIN_EMAIL = config['PASSWORD_ADMIN_EMAIL']
if 'PASSWORD_ADMIN_PASSWORD' in environ:
PASSWORD_ADMIN_PASSWORD = environ['PASSWORD_ADMIN_PASSWORD']
else:
# this parameter is mandatory
PASSWORD_ADMIN_PASSWORD = config['PASSWORD_ADMIN_PASSWORD']
if 'PASSWORD_DEVICE_IDENTIFIER' in environ:
PASSWORD_DEVICE_IDENTIFIER = environ['PASSWORD_DEVICE_IDENTIFIER']
else:
PASSWORD_DEVICE_IDENTIFIER = config.get('PASSWORD_DEVICE_IDENTIFIER', uuid4())
if 'PASSWORD_URL' in environ:
PASSWORD_URL = environ['PASSWORD_URL']
else:
PASSWORD_URL = config.get('PASSWORD_URL', 'https://localhost:8001/')
if 'PKI_ADMIN_PASSWORD' in environ:
PKI_ADMIN_PASSWORD = environ['PKI_ADMIN_PASSWORD']
else:
PKI_ADMIN_PASSWORD = config['PKI_ADMIN_PASSWORD']
if 'PKI_ADMIN_EMAIL' in environ:
PKI_ADMIN_EMAIL = environ['PKI_ADMIN_EMAIL']
else:
PKI_ADMIN_EMAIL = config['PKI_ADMIN_EMAIL']
if 'PKI_URL' in environ:
PKI_URL = environ['PKI_URL']
else:
PKI_URL = config.get('PKI_URL', 'http://localhost:8002')
def dsn_factory(database, user, password, address=DB_ADDRESS): def dsn_factory(database, user, password, address=DB_ADDRESS):
@ -85,6 +132,7 @@ def dsn_factory(database, user, password, address=DB_ADDRESS):
_config = {'database': {'dsn': dsn_factory(RISOTTO_DB_NAME, RISOTTO_DB_USER, RISOTTO_DB_PASSWORD), _config = {'database': {'dsn': dsn_factory(RISOTTO_DB_NAME, RISOTTO_DB_USER, RISOTTO_DB_PASSWORD),
'tiramisu_dsn': dsn_factory(TIRAMISU_DB_NAME, TIRAMISU_DB_USER, TIRAMISU_DB_PASSWORD), 'tiramisu_dsn': dsn_factory(TIRAMISU_DB_NAME, TIRAMISU_DB_USER, TIRAMISU_DB_PASSWORD),
'celery_dsn': dsn_factory(CELERYRISOTTO_DB_NAME, CELERYRISOTTO_DB_USER, CELERYRISOTTO_DB_PASSWORD)
}, },
'http_server': {'port': RISOTTO_PORT, 'http_server': {'port': RISOTTO_PORT,
'default_user': DEFAULT_USER}, 'default_user': DEFAULT_USER},
@ -97,13 +145,24 @@ _config = {'database': {'dsn': dsn_factory(RISOTTO_DB_NAME, RISOTTO_DB_USER, RIS
'sql_dir': SQL_DIR, 'sql_dir': SQL_DIR,
'tmp_dir': TMP_DIR, 'tmp_dir': TMP_DIR,
}, },
'password': {'admin_username': PASSWORD_ADMIN_USERNAME,
'admin_email': PASSWORD_ADMIN_EMAIL,
'admin_password': PASSWORD_ADMIN_PASSWORD,
'device_identifier': PASSWORD_DEVICE_IDENTIFIER,
'service_url': PASSWORD_URL,
},
'pki': {'admin_password': PKI_ADMIN_PASSWORD,
'owner': PKI_ADMIN_EMAIL,
'url': PKI_URL,
},
'cache': {'root_path': CACHE_ROOT_PATH}, 'cache': {'root_path': CACHE_ROOT_PATH},
'servermodel': {'internal_source_path': SRV_SEED_PATH, 'servermodel': {'internal_source_path': SRV_SEED_PATH,
'internal_source': 'internal'}, 'internal_source': 'internal'},
'submodule': {'allow_insecure_https': False, 'submodule': {'allow_insecure_https': False,
'pki': '192.168.56.112'}, 'pki': '192.168.56.112'},
'provider': {'factory_configuration_dir': PROVIDER_FACTORY_CONFIG_DIR, 'provider': {'factory_configuration_filename': 'infra.json',
'factory_configuration_filename': 'infra.json'}, 'packer_filename': 'recipe.json',
'risotto_images_dir': IMAGE_PATH},
} }

View File

@ -39,15 +39,40 @@ class Controller:
**kwargs, **kwargs,
): ):
""" a wrapper to dispatcher's publish""" """ a wrapper to dispatcher's publish"""
version, message = uri.split('.', 1)
if args: if args:
raise ValueError(_(f'the URI "{uri}" can only be published with keyword arguments')) raise ValueError(_(f'the URI "{uri}" can only be published with keyword arguments'))
version, message = uri.split('.', 1)
await dispatcher.publish(version, await dispatcher.publish(version,
message, message,
risotto_context, risotto_context,
**kwargs, **kwargs,
) )
@staticmethod
async def check_role(self,
uri: str,
username: str,
**kwargs: dict,
) -> None:
# create a new config
async with await Config(dispatcher.option) as config:
await config.property.read_write()
await config.option('message').value.set(uri)
subconfig = config.option(uri)
for key, value in kwargs.items():
try:
await subconfig.option(key).value.set(value)
except AttributeError:
if get_config()['global']['debug']:
print_exc()
raise ValueError(_(f'unknown parameter in "{uri}": "{key}"'))
except ValueOptionError as err:
raise ValueError(_(f'invalid parameter in "{uri}": {err}'))
await dispatcher.check_role(subconfig,
username,
uri,
)
async def on_join(self, async def on_join(self,
risotto_context, risotto_context,
): ):

View File

@ -4,6 +4,7 @@ try:
except: except:
from tiramisu import Config from tiramisu import Config
from tiramisu.error import ValueOptionError from tiramisu.error import ValueOptionError
from asyncio import get_event_loop, ensure_future
from traceback import print_exc from traceback import print_exc
from copy import copy from copy import copy
from typing import Dict, Callable, List, Optional from typing import Dict, Callable, List, Optional
@ -15,7 +16,6 @@ from .logger import log
from .config import get_config from .config import get_config
from .context import Context from .context import Context
from . import register from . import register
from .remote import Remote
class CallDispatcher: class CallDispatcher:
@ -79,7 +79,7 @@ class CallDispatcher:
""" execute the function associate with specified uri """ execute the function associate with specified uri
arguments are validate before arguments are validate before
""" """
risotto_context = self.build_new_context(old_risotto_context, risotto_context = self.build_new_context(old_risotto_context.__dict__,
version, version,
message, message,
'rpc', 'rpc',
@ -88,20 +88,35 @@ class CallDispatcher:
raise CallError(_(f'cannot find version of message "{version}"')) raise CallError(_(f'cannot find version of message "{version}"'))
if message not in self.messages[version]: if message not in self.messages[version]:
raise CallError(_(f'cannot find message "{version}.{message}"')) raise CallError(_(f'cannot find message "{version}.{message}"'))
function_objs = [self.messages[version][message]] function_obj = self.messages[version][message]
# do not start a new database connection # do not start a new database connection
if hasattr(old_risotto_context, 'connection'): if hasattr(old_risotto_context, 'connection'):
risotto_context.connection = old_risotto_context.connection risotto_context.connection = old_risotto_context.connection
return await self.launch(version, await self.check_message_type(risotto_context,
message, kwargs,
risotto_context, )
check_role, config_arguments = await self.load_kwargs_to_config(risotto_context,
f'{version}.{message}',
kwargs,
check_role,
internal,
)
return await self.launch(risotto_context,
kwargs, kwargs,
function_objs, config_arguments,
internal, function_obj,
) )
else: else:
try: try:
await self.check_message_type(risotto_context,
kwargs,
)
config_arguments = await self.load_kwargs_to_config(risotto_context,
f'{version}.{message}',
kwargs,
check_role,
internal,
)
async with self.pool.acquire() as connection: async with self.pool.acquire() as connection:
await connection.set_type_codec( await connection.set_type_codec(
'json', 'json',
@ -111,13 +126,10 @@ class CallDispatcher:
) )
risotto_context.connection = connection risotto_context.connection = connection
async with connection.transaction(): async with connection.transaction():
return await self.launch(version, return await self.launch(risotto_context,
message,
risotto_context,
check_role,
kwargs, kwargs,
function_objs, config_arguments,
internal, function_obj,
) )
except CallError as err: except CallError as err:
raise err raise err
@ -139,66 +151,80 @@ class CallDispatcher:
class PublishDispatcher: class PublishDispatcher:
async def register_remote(self) -> None:
print()
print(_('======== Registered remote event ========'))
self.listened_connection = await self.pool.acquire()
for version, messages in self.messages.items():
for message, message_infos in messages.items():
# event not emit locally
if message_infos['pattern'] == 'event' and 'functions' in message_infos and message_infos['functions']:
# module, submodule, submessage = message.split('.', 2)
# if f'{module}.{submodule}' not in self.injected_self:
uri = f'{version}.{message}'
print(f' - {uri}')
await self.listened_connection.add_listener(uri,
self.to_async_publish,
)
async def publish(self, async def publish(self,
version: str, version: str,
message: str, message: str,
old_risotto_context: Context, risotto_context: Context,
check_role: bool=False,
internal: bool=True,
**kwargs, **kwargs,
) -> None: ) -> None:
risotto_context = self.build_new_context(old_risotto_context, if version not in self.messages or message not in self.messages[version]:
raise ValueError(_(f'cannot find URI "{version}.{message}"'))
# publish to remote
remote_kw = dumps({'kwargs': kwargs,
'context': {'username': risotto_context.username,
'paths': risotto_context.paths,
}
})
# FIXME should be better :/
remote_kw = remote_kw.replace("'", "''")
await risotto_context.connection.execute(f'NOTIFY "{version}.{message}", \'{remote_kw}\'')
def to_async_publish(self,
con: 'asyncpg.connection.Connection',
pid: int,
uri: str,
payload: str,
) -> None:
version, message = uri.split('.', 1)
loop = get_event_loop()
remote_kw = loads(payload)
risotto_context = self.build_new_context(remote_kw['context'],
version, version,
message, message,
'event', 'event',
) )
try: callback = lambda: ensure_future(self._publish(version,
function_objs = self.messages[version][message].get('functions', []) message,
except KeyError: risotto_context,
raise ValueError(_(f'cannot find message {version}.{message}')) **remote_kw['kwargs'],
# do not start a new database connection ))
if hasattr(old_risotto_context, 'connection'): loop.call_soon(callback)
# publish to remove
remote_kw = dumps({'kwargs': kwargs, async def _publish(self,
'context': risotto_context.__dict__, version: str,
}) message: str,
risotto_context.connection = old_risotto_context.connection risotto_context: Context,
# FIXME should be better :/ **kwargs,
remote_kw = remote_kw.replace("'", "''") ) -> None:
await risotto_context.connection.execute(f'NOTIFY "{version}.{message}", \'{remote_kw}\'') config_arguments = await self.load_kwargs_to_config(risotto_context,
return await self.launch(version, f'{version}.{message}',
message, kwargs,
risotto_context, False,
check_role, False,
kwargs, )
function_objs, for function_obj in self.messages[version][message]['functions']:
internal, async with self.pool.acquire() as connection:
) try:
async with self.pool.acquire() as connection: await self.check_message_type(risotto_context,
try: kwargs,
await connection.set_type_codec( )
'json',
encoder=dumps,
decoder=loads,
schema='pg_catalog'
)
risotto_context.connection = connection
async with connection.transaction():
return await self.launch(version,
message,
risotto_context,
check_role,
kwargs,
function_objs,
internal,
)
except CallError as err:
pass
except Exception as err:
# if there is a problem with arguments, log and do nothing
if get_config()['global']['debug']:
print_exc()
async with self.pool.acquire() as connection:
await connection.set_type_codec( await connection.set_type_codec(
'json', 'json',
encoder=dumps, encoder=dumps,
@ -207,18 +233,37 @@ class PublishDispatcher:
) )
risotto_context.connection = connection risotto_context.connection = connection
async with connection.transaction(): async with connection.transaction():
await log.error_msg(risotto_context, kwargs, err) await self.launch(risotto_context,
kwargs,
config_arguments,
function_obj,
)
except CallError as err:
pass
except Exception as err:
# if there is a problem with arguments, log and do nothing
if get_config()['global']['debug']:
print_exc()
async with self.pool.acquire() as connection:
await connection.set_type_codec(
'json',
encoder=dumps,
decoder=loads,
schema='pg_catalog'
)
risotto_context.connection = connection
async with connection.transaction():
await log.error_msg(risotto_context, kwargs, err)
class Dispatcher(register.RegisterDispatcher, class Dispatcher(register.RegisterDispatcher,
Remote,
CallDispatcher, CallDispatcher,
PublishDispatcher): PublishDispatcher):
""" Manage message (call or publish) """ Manage message (call or publish)
so launch a function when a message is called so launch a function when a message is called
""" """
def build_new_context(self, def build_new_context(self,
old_risotto_context: Context, context: dict,
version: str, version: str,
message: str, message: str,
type: str, type: str,
@ -227,8 +272,8 @@ class Dispatcher(register.RegisterDispatcher,
""" """
uri = version + '.' + message uri = version + '.' + message
risotto_context = Context() risotto_context = Context()
risotto_context.username = old_risotto_context.username risotto_context.username = context['username']
risotto_context.paths = copy(old_risotto_context.paths) risotto_context.paths = copy(context['paths'])
risotto_context.paths.append(uri) risotto_context.paths.append(uri)
risotto_context.uri = uri risotto_context.uri = uri
risotto_context.type = type risotto_context.type = type
@ -288,7 +333,7 @@ class Dispatcher(register.RegisterDispatcher,
parameters = await subconfig.value.dict() parameters = await subconfig.value.dict()
if extra_parameters: if extra_parameters:
parameters.update(extra_parameters) parameters.update(extra_parameters)
return parameters return parameters
def get_service(self, def get_service(self,
name: str): name: str):
@ -297,14 +342,15 @@ class Dispatcher(register.RegisterDispatcher,
async def check_role(self, async def check_role(self,
config: Config, config: Config,
user_login: str, user_login: str,
uri: str) -> None: uri: str,
) -> None:
async with self.pool.acquire() as connection: async with self.pool.acquire() as connection:
async with connection.transaction(): async with connection.transaction():
# Verify if user exists and get ID # Verify if user exists and get ID
sql = ''' sql = '''
SELECT UserId SELECT UserId
FROM UserUser FROM UserUser
WHERE UserLogin = $1 WHERE Login = $1
''' '''
user_id = await connection.fetchval(sql, user_id = await connection.fetchval(sql,
user_login) user_login)
@ -342,65 +388,55 @@ class Dispatcher(register.RegisterDispatcher,
raise NotAllowedError(_(f'You ({user_login}) don\'t have any authorisation to access to "{uri}"')) raise NotAllowedError(_(f'You ({user_login}) don\'t have any authorisation to access to "{uri}"'))
async def launch(self, async def launch(self,
version: str,
message: str,
risotto_context: Context, risotto_context: Context,
check_role: bool,
kwargs: Dict, kwargs: Dict,
function_objs: List, config_arguments: dict,
internal: bool, function_obj: Callable,
) -> Optional[Dict]: ) -> Optional[Dict]:
await self.check_message_type(risotto_context, # so send the message
kwargs) function = function_obj['function']
config_arguments = await self.load_kwargs_to_config(risotto_context, risotto_context.module = function_obj['module'].split('.', 1)[0]
f'{version}.{message}', function_name = function.__name__
kwargs, info_msg = _(f"in function {function_obj['full_module_name']}.{function_name}")
check_role, # build argument for this function
internal, if risotto_context.type == 'rpc':
) kw = config_arguments
# config is ok, so send the message else:
for function_obj in function_objs: kw = {}
function = function_obj['function'] for key, value in config_arguments.items():
submodule_name = function_obj['module'] if key in function_obj['arguments']:
function_name = function.__name__ kw[key] = value
risotto_context.module = submodule_name.split('.', 1)[0]
info_msg = _(f'in module {submodule_name}.{function_name}')
# build argument for this function
if risotto_context.type == 'rpc':
kw = config_arguments
else:
kw = {}
for key, value in config_arguments.items():
if key in function_obj['arguments']:
kw[key] = value
kw['risotto_context'] = risotto_context kw['risotto_context'] = risotto_context
returns = await function(self.injected_self[function_obj['module']], **kw) returns = await function(self.get_service(function_obj['module']), **kw)
if risotto_context.type == 'rpc': if risotto_context.type == 'rpc':
# valid returns # valid returns
await self.valid_call_returns(risotto_context, await self.valid_call_returns(risotto_context,
function, function,
returns, returns,
kwargs) kwargs,
# log the success )
await log.info_msg(risotto_context, # log the success
{'arguments': kwargs, await log.info_msg(risotto_context,
'returns': returns}, {'arguments': kwargs,
info_msg) 'returns': returns},
# notification info_msg,
if function_obj.get('notification'): )
notif_version, notif_message = function_obj['notification'].split('.', 1) # notification
if not isinstance(returns, list): if function_obj.get('notification'):
send_returns = [returns] notif_version, notif_message = function_obj['notification'].split('.', 1)
else: if not isinstance(returns, list):
send_returns = returns send_returns = [returns]
for ret in send_returns: else:
await self.publish(notif_version, send_returns = returns
notif_message, for ret in send_returns:
risotto_context, await self.publish(notif_version,
**ret) notif_message,
if risotto_context.type == 'rpc': risotto_context,
return returns **ret,
)
if risotto_context.type == 'rpc':
return returns
dispatcher = Dispatcher() dispatcher = Dispatcher()

View File

@ -29,7 +29,8 @@ def create_context(request):
def register(version: str, def register(version: str,
path: str): path: str,
):
""" Decorator to register function to the http route """ Decorator to register function to the http route
""" """
def decorator(function): def decorator(function):
@ -41,7 +42,9 @@ def register(version: str,
class extra_route_handler: class extra_route_handler:
async def __new__(cls, request): async def __new__(cls,
request,
):
kwargs = dict(request.match_info) kwargs = dict(request.match_info)
kwargs['request'] = request kwargs['request'] = request
kwargs['risotto_context'] = create_context(request) kwargs['risotto_context'] = create_context(request)
@ -96,11 +99,13 @@ async def handle(request):
print_exc() print_exc()
raise HTTPInternalServerError(reason=str(err)) raise HTTPInternalServerError(reason=str(err))
return Response(text=dumps({'response': text}), return Response(text=dumps({'response': text}),
content_type='application/json') content_type='application/json',
)
async def api(request, async def api(request,
risotto_context): risotto_context,
):
global TIRAMISU global TIRAMISU
if not TIRAMISU: if not TIRAMISU:
# check all URI that have an associated role # check all URI that have an associated role
@ -152,7 +157,8 @@ async def get_app(loop):
for version in versions: for version in versions:
api_route = {'function': api, api_route = {'function': api,
'version': version, 'version': version,
'path': f'/api/{version}'} 'path': f'/api/{version}',
}
extra_handler = type(api_route['path'], (extra_route_handler,), api_route) extra_handler = type(api_route['path'], (extra_route_handler,), api_route)
routes.append(get(api_route['path'], extra_handler)) routes.append(get(api_route['path'], extra_handler))
print(f' - {api_route["path"]} (http_get)') print(f' - {api_route["path"]} (http_get)')
@ -174,7 +180,10 @@ async def get_app(loop):
await dispatcher.register_remote() await dispatcher.register_remote()
print() print()
await dispatcher.on_join() await dispatcher.on_join()
return await loop.create_server(app.make_handler(), '*', get_config()['http_server']['port']) return await loop.create_server(app.make_handler(),
'*',
get_config()['http_server']['port'],
)
TIRAMISU = None TIRAMISU = None

View File

@ -23,7 +23,7 @@ class Services():
def load_services(self): def load_services(self):
for entry_point in iter_entry_points(group='risotto_services'): for entry_point in iter_entry_points(group='risotto_services'):
self.services.setdefault(entry_point.name, []) self.services.setdefault(entry_point.name, {})
self.services_loaded = True self.services_loaded = True
def load_modules(self, def load_modules(self,
@ -32,21 +32,20 @@ class Services():
for entry_point in iter_entry_points(group='risotto_modules'): for entry_point in iter_entry_points(group='risotto_modules'):
service_name, module_name = entry_point.name.split('.') service_name, module_name = entry_point.name.split('.')
if limit_services is None or service_name in limit_services: if limit_services is None or service_name in limit_services:
setattr(self, module_name, entry_point.load()) self.services[service_name][module_name] = entry_point.load()
self.services[service_name].append(module_name)
self.modules_loaded = True self.modules_loaded = True
#
def get_services(self): # def get_services(self):
if not self.services_loaded: # if not self.services_loaded:
self.load_services() # self.load_services()
return [(s, getattr(self, s)) for s in self.services] # return [(service, getattr(self, service)) for service in self.services]
def get_modules(self, def get_modules(self,
limit_services: Optional[List[str]]=None, limit_services: Optional[List[str]]=None,
) -> List[str]: ) -> List[str]:
if not self.modules_loaded: if not self.modules_loaded:
self.load_modules(limit_services=limit_services) self.load_modules(limit_services=limit_services)
return [(module + '.' + submodule, getattr(self, submodule)) for module, submodules in self.services.items() for submodule in submodules] return [(module + '.' + submodule, entry_point) for module, submodules in self.services.items() for submodule, entry_point in submodules.items()]
def get_services_list(self): def get_services_list(self):
return self.services.keys() return self.services.keys()
@ -199,7 +198,8 @@ class RegisterDispatcher:
raise RegistrationError(_(f'the message {message} not exists')) raise RegistrationError(_(f'the message {message} not exists'))
# xxx submodule can only be register with v1.yyy.xxx..... message # xxx submodule can only be register with v1.yyy.xxx..... message
risotto_module_name, submodule_name = function.__module__.split('.')[-3:-1] full_module_name = function.__module__
risotto_module_name, submodule_name = full_module_name.split('.')[-3:-1]
module_name = risotto_module_name.split('_')[-1] module_name = risotto_module_name.split('_')[-1]
message_module, message_submodule, message_name = message.split('.', 2) message_module, message_submodule, message_name = message.split('.', 2)
if message_module not in self.risotto_modules: if message_module not in self.risotto_modules:
@ -224,6 +224,7 @@ class RegisterDispatcher:
register(version, register(version,
message, message,
f'{module_name}.{submodule_name}', f'{module_name}.{submodule_name}',
full_module_name,
function, function,
function_args, function_args,
notification, notification,
@ -233,11 +234,13 @@ class RegisterDispatcher:
version: str, version: str,
message: str, message: str,
module_name: str, module_name: str,
full_module_name: str,
function: Callable, function: Callable,
function_args: list, function_args: list,
notification: Optional[str], notification: Optional[str],
): ):
self.messages[version][message]['module'] = module_name self.messages[version][message]['module'] = module_name
self.messages[version][message]['full_module_name'] = full_module_name
self.messages[version][message]['function'] = function self.messages[version][message]['function'] = function
self.messages[version][message]['arguments'] = function_args self.messages[version][message]['arguments'] = function_args
if notification: if notification:
@ -247,6 +250,7 @@ class RegisterDispatcher:
version: str, version: str,
message: str, message: str,
module_name: str, module_name: str,
full_module_name: str,
function: Callable, function: Callable,
function_args: list, function_args: list,
notification: Optional[str], notification: Optional[str],
@ -255,8 +259,10 @@ class RegisterDispatcher:
self.messages[version][message]['functions'] = [] self.messages[version][message]['functions'] = []
dico = {'module': module_name, dico = {'module': module_name,
'full_module_name': full_module_name,
'function': function, 'function': function,
'arguments': function_args} 'arguments': function_args,
}
if notification and notification: if notification and notification:
dico['notification'] = notification dico['notification'] = notification
self.messages[version][message]['functions'].append(dico) self.messages[version][message]['functions'].append(dico)
@ -271,7 +277,7 @@ class RegisterDispatcher:
try: try:
self.injected_self[submodule_name] = module.Risotto(test) self.injected_self[submodule_name] = module.Risotto(test)
except AttributeError as err: except AttributeError as err:
raise RegistrationError(_(f'unable to register the module {submodule_name}, this module must have Risotto class')) print(_(f'unable to register the module {submodule_name}, this module must have Risotto class'))
def validate(self): def validate(self):
""" check if all messages have a function """ check if all messages have a function
@ -300,7 +306,7 @@ class RegisterDispatcher:
) )
if truncate: if truncate:
async with connection.transaction(): async with connection.transaction():
await connection.execute('TRUNCATE InfraServer, InfraSite, InfraZone, Log, ProviderDeployment, ProviderFactoryCluster, ProviderFactoryClusterNode, SettingApplicationservice, SettingApplicationServiceDependency, SettingRelease, SettingServer, SettingServermodel, SettingSource, UserRole, UserRoleURI, UserURI, UserUser, InfraServermodel, ProviderZone, ProviderServer, ProviderSource, ProviderApplicationservice ProviderServermodel') await connection.execute('TRUNCATE InfraServer, InfraSite, InfraZone, Log, ProviderDeployment, ProviderFactoryCluster, ProviderFactoryClusterNode, SettingApplicationservice, SettingApplicationServiceDependency, SettingRelease, SettingServer, SettingServermodel, SettingSource, UserRole, UserRoleURI, UserURI, UserUser, InfraServermodel, ProviderZone, ProviderServer, ProviderSource, ProviderApplicationservice, ProviderServermodel')
async with connection.transaction(): async with connection.transaction():
for submodule_name, module in self.injected_self.items(): for submodule_name, module in self.injected_self.items():
risotto_context = Context() risotto_context = Context()
@ -309,7 +315,7 @@ class RegisterDispatcher:
risotto_context.type = None risotto_context.type = None
risotto_context.connection = connection risotto_context.connection = connection
risotto_context.module = submodule_name.split('.', 1)[0] risotto_context.module = submodule_name.split('.', 1)[0]
info_msg = _(f'in module risotto_{submodule_name}.on_join') info_msg = _(f'in function risotto_{submodule_name}.on_join')
await log.info_msg(risotto_context, await log.info_msg(risotto_context,
None, None,
info_msg) info_msg)

View File

@ -1,42 +0,0 @@
from asyncio import get_event_loop, ensure_future
from json import loads
from .context import Context
from .config import get_config
from .utils import _
class Remote:
async def register_remote(self) -> None:
print()
print(_('======== Registered remote event ========'))
self.listened_connection = await self.pool.acquire()
for version, messages in self.messages.items():
for message, message_infos in messages.items():
# event not emit locally
if message_infos['pattern'] == 'event':
module, submodule, submessage = message.split('.', 2)
if f'{module}.{submodule}' not in self.injected_self:
uri = f'{version}.{message}'
print(f' - {uri}')
await self.listened_connection.add_listener(uri, self.to_async_publish)
def to_async_publish(self,
con: 'asyncpg.connection.Connection',
pid: int,
uri: str,
payload: str,
) -> None:
version, message = uri.split('.', 1)
loop = get_event_loop()
remote_kw = loads(payload)
context = Context()
for key, value in remote_kw['context'].items():
setattr(context, key, value)
callback = lambda: ensure_future(self.publish(version,
message,
context,
**remote_kw['kwargs'],
))
loop.call_soon(callback)

View File

@ -392,7 +392,6 @@ async def test_server_created_base():
release_distribution='last', release_distribution='last',
site_name='site_1', site_name='site_1',
zones_name=['zones'], zones_name=['zones'],
zones_ip=['1.1.1.1'],
) )
assert list(config_module.server) == [server_name] assert list(config_module.server) == [server_name]
assert set(config_module.server[server_name]) == {'server', 'server_to_deploy', 'funcs_file'} assert set(config_module.server[server_name]) == {'server', 'server_to_deploy', 'funcs_file'}
@ -420,7 +419,6 @@ async def test_server_created_own_sm():
release_distribution='last', release_distribution='last',
site_name='site_1', site_name='site_1',
zones_name=['zones'], zones_name=['zones'],
zones_ip=['1.1.1.1'],
) )
assert list(config_module.server) == [server_name] assert list(config_module.server) == [server_name]
assert set(config_module.server[server_name]) == {'server', 'server_to_deploy', 'funcs_file'} assert set(config_module.server[server_name]) == {'server', 'server_to_deploy', 'funcs_file'}
@ -469,7 +467,6 @@ async def test_server_configuration_get():
release_distribution='last', release_distribution='last',
site_name='site_1', site_name='site_1',
zones_name=['zones'], zones_name=['zones'],
zones_ip=['1.1.1.1'],
) )
# #
await config_module.server[server_name]['server'].property.read_write() await config_module.server[server_name]['server'].property.read_write()
@ -515,7 +512,6 @@ async def test_server_configuration_deployed():
release_distribution='last', release_distribution='last',
site_name='site_1', site_name='site_1',
zones_name=['zones'], zones_name=['zones'],
zones_ip=['1.1.1.1'],
) )
# #
await config_module.server[server_name]['server'].property.read_write() await config_module.server[server_name]['server'].property.read_write()