Compare commits

..

11 Commits

6 changed files with 223 additions and 185 deletions

5
debian/changelog vendored
View File

@ -1,5 +0,0 @@
risotto (0.1) unstable; urgency=low
* first version
-- Cadoles <contact@cadoles.com> Fri, 20 Mar 2020 15:18:25 +0100

View File

@ -1,6 +1,7 @@
from os import environ from os import environ
from os.path import isfile from os.path import isfile
from configobj import ConfigObj from configobj import ConfigObj
from uuid import uuid4
CONFIG_FILE = environ.get('CONFIG_FILE', '/etc/risotto/risotto.conf') CONFIG_FILE = environ.get('CONFIG_FILE', '/etc/risotto/risotto.conf')
@ -20,10 +21,6 @@ if 'CONFIGURATION_DIR' in environ:
CONFIGURATION_DIR = environ['CONFIGURATION_DIR'] CONFIGURATION_DIR = environ['CONFIGURATION_DIR']
else: else:
CONFIGURATION_DIR = config.get('CONFIGURATION_DIR', '/srv/risotto/configurations') CONFIGURATION_DIR = config.get('CONFIGURATION_DIR', '/srv/risotto/configurations')
if 'PROVIDER_FACTORY_CONFIG_DIR' in environ:
PROVIDER_FACTORY_CONFIG_DIR = environ['PROVIDER_FACTORY_CONFIG_DIR']
else:
PROVIDER_FACTORY_CONFIG_DIR = config.get('PROVIDER_FACTORY_CONFIG_DIR', '/srv/factory')
if 'DEFAULT_USER' in environ: if 'DEFAULT_USER' in environ:
DEFAULT_USER = environ['DEFAULT_USER'] DEFAULT_USER = environ['DEFAULT_USER']
else: else:
@ -52,6 +49,18 @@ if 'TIRAMISU_DB_USER' in environ:
TIRAMISU_DB_USER = environ['TIRAMISU_DB_USER'] TIRAMISU_DB_USER = environ['TIRAMISU_DB_USER']
else: else:
TIRAMISU_DB_USER = config.get('TIRAMISU_DB_USER', 'tiramisu') TIRAMISU_DB_USER = config.get('TIRAMISU_DB_USER', 'tiramisu')
if 'CELERYRISOTTO_DB_NAME' in environ:
CELERYRISOTTO_DB_NAME = environ['CELERYRISOTTO_DB_NAME']
else:
CELERYRISOTTO_DB_NAME = config.get('CELERYRISOTTO_DB_NAME', None)
if 'CELERYRISOTTO_DB_PASSWORD' in environ:
CELERYRISOTTO_DB_PASSWORD = environ['CELERYRISOTTO_DB_PASSWORD']
else:
CELERYRISOTTO_DB_PASSWORD = config.get('CELERYRISOTTO_DB_PASSWORD', None)
if 'CELERYRISOTTO_DB_USER' in environ:
CELERYRISOTTO_DB_USER = environ['CELERYRISOTTO_DB_USER']
else:
CELERYRISOTTO_DB_USER = config.get('CELERYRISOTTO_DB_USER', None)
if 'DB_ADDRESS' in environ: if 'DB_ADDRESS' in environ:
DB_ADDRESS = environ['DB_ADDRESS'] DB_ADDRESS = environ['DB_ADDRESS']
else: else:
@ -76,6 +85,32 @@ if 'TMP_DIR' in environ:
TMP_DIR = environ['TMP_DIR'] TMP_DIR = environ['TMP_DIR']
else: else:
TMP_DIR = config.get('TMP_DIR', '/tmp') TMP_DIR = config.get('TMP_DIR', '/tmp')
if 'IMAGE_PATH' in environ:
IMAGE_PATH = environ['IMAGE_PATH']
else:
IMAGE_PATH = config.get('IMAGE_PATH', '/tmp')
if 'PASSWORD_ADMIN_USERNAME' in environ:
PASSWORD_ADMIN_USERNAME = environ['PASSWORD_ADMIN_USERNAME']
else:
PASSWORD_ADMIN_USERNAME = config.get('PASSWORD_ADMIN_USERNAME', 'risotto')
if 'PASSWORD_ADMIN_EMAIL' in environ:
PASSWORD_ADMIN_EMAIL = environ['PASSWORD_ADMIN_EMAIL']
else:
# this parameter is mandatory
PASSWORD_ADMIN_EMAIL = config['PASSWORD_ADMIN_EMAIL']
if 'PASSWORD_ADMIN_PASSWORD' in environ:
PASSWORD_ADMIN_PASSWORD = environ['PASSWORD_ADMIN_PASSWORD']
else:
# this parameter is mandatory
PASSWORD_ADMIN_PASSWORD = config['PASSWORD_ADMIN_PASSWORD']
if 'PASSWORD_DEVICE_IDENTIFIER' in environ:
PASSWORD_DEVICE_IDENTIFIER = environ['PASSWORD_DEVICE_IDENTIFIER']
else:
PASSWORD_DEVICE_IDENTIFIER = config.get('PASSWORD_DEVICE_IDENTIFIER', uuid4())
if 'PASSWORD_URL' in environ:
PASSWORD_URL = environ['PASSWORD_URL']
else:
PASSWORD_URL = config.get('PASSWORD_URL', 'https://localhost:8001/')
def dsn_factory(database, user, password, address=DB_ADDRESS): def dsn_factory(database, user, password, address=DB_ADDRESS):
@ -85,6 +120,7 @@ def dsn_factory(database, user, password, address=DB_ADDRESS):
_config = {'database': {'dsn': dsn_factory(RISOTTO_DB_NAME, RISOTTO_DB_USER, RISOTTO_DB_PASSWORD), _config = {'database': {'dsn': dsn_factory(RISOTTO_DB_NAME, RISOTTO_DB_USER, RISOTTO_DB_PASSWORD),
'tiramisu_dsn': dsn_factory(TIRAMISU_DB_NAME, TIRAMISU_DB_USER, TIRAMISU_DB_PASSWORD), 'tiramisu_dsn': dsn_factory(TIRAMISU_DB_NAME, TIRAMISU_DB_USER, TIRAMISU_DB_PASSWORD),
'celery_dsn': dsn_factory(CELERYRISOTTO_DB_NAME, CELERYRISOTTO_DB_USER, CELERYRISOTTO_DB_PASSWORD)
}, },
'http_server': {'port': RISOTTO_PORT, 'http_server': {'port': RISOTTO_PORT,
'default_user': DEFAULT_USER}, 'default_user': DEFAULT_USER},
@ -97,13 +133,20 @@ _config = {'database': {'dsn': dsn_factory(RISOTTO_DB_NAME, RISOTTO_DB_USER, RIS
'sql_dir': SQL_DIR, 'sql_dir': SQL_DIR,
'tmp_dir': TMP_DIR, 'tmp_dir': TMP_DIR,
}, },
'password': {'admin_username': PASSWORD_ADMIN_USERNAME,
'admin_email': PASSWORD_ADMIN_EMAIL,
'admin_password': PASSWORD_ADMIN_PASSWORD,
'device_identifier': PASSWORD_DEVICE_IDENTIFIER,
'service_url': PASSWORD_URL,
},
'cache': {'root_path': CACHE_ROOT_PATH}, 'cache': {'root_path': CACHE_ROOT_PATH},
'servermodel': {'internal_source_path': SRV_SEED_PATH, 'servermodel': {'internal_source_path': SRV_SEED_PATH,
'internal_source': 'internal'}, 'internal_source': 'internal'},
'submodule': {'allow_insecure_https': False, 'submodule': {'allow_insecure_https': False,
'pki': '192.168.56.112'}, 'pki': '192.168.56.112'},
'provider': {'factory_configuration_dir': PROVIDER_FACTORY_CONFIG_DIR, 'provider': {'factory_configuration_filename': 'infra.json',
'factory_configuration_filename': 'infra.json'}, 'packer_filename': 'recipe.json',
'risotto_images_dir': IMAGE_PATH},
} }

View File

@ -39,9 +39,9 @@ class Controller:
**kwargs, **kwargs,
): ):
""" a wrapper to dispatcher's publish""" """ a wrapper to dispatcher's publish"""
version, message = uri.split('.', 1)
if args: if args:
raise ValueError(_(f'the URI "{uri}" can only be published with keyword arguments')) raise ValueError(_(f'the URI "{uri}" can only be published with keyword arguments'))
version, message = uri.split('.', 1)
await dispatcher.publish(version, await dispatcher.publish(version,
message, message,
risotto_context, risotto_context,

View File

@ -4,6 +4,7 @@ try:
except: except:
from tiramisu import Config from tiramisu import Config
from tiramisu.error import ValueOptionError from tiramisu.error import ValueOptionError
from asyncio import get_event_loop, ensure_future
from traceback import print_exc from traceback import print_exc
from copy import copy from copy import copy
from typing import Dict, Callable, List, Optional from typing import Dict, Callable, List, Optional
@ -15,7 +16,6 @@ from .logger import log
from .config import get_config from .config import get_config
from .context import Context from .context import Context
from . import register from . import register
from .remote import Remote
class CallDispatcher: class CallDispatcher:
@ -79,7 +79,7 @@ class CallDispatcher:
""" execute the function associate with specified uri """ execute the function associate with specified uri
arguments are validate before arguments are validate before
""" """
risotto_context = self.build_new_context(old_risotto_context, risotto_context = self.build_new_context(old_risotto_context.__dict__,
version, version,
message, message,
'rpc', 'rpc',
@ -88,20 +88,35 @@ class CallDispatcher:
raise CallError(_(f'cannot find version of message "{version}"')) raise CallError(_(f'cannot find version of message "{version}"'))
if message not in self.messages[version]: if message not in self.messages[version]:
raise CallError(_(f'cannot find message "{version}.{message}"')) raise CallError(_(f'cannot find message "{version}.{message}"'))
function_objs = [self.messages[version][message]] function_obj = self.messages[version][message]
# do not start a new database connection # do not start a new database connection
if hasattr(old_risotto_context, 'connection'): if hasattr(old_risotto_context, 'connection'):
risotto_context.connection = old_risotto_context.connection risotto_context.connection = old_risotto_context.connection
return await self.launch(version, await self.check_message_type(risotto_context,
message,
risotto_context,
check_role,
kwargs, kwargs,
function_objs, )
config_arguments = await self.load_kwargs_to_config(risotto_context,
f'{version}.{message}',
kwargs,
check_role,
internal, internal,
) )
return await self.launch(risotto_context,
kwargs,
config_arguments,
function_obj,
)
else: else:
try: try:
await self.check_message_type(risotto_context,
kwargs,
)
config_arguments = await self.load_kwargs_to_config(risotto_context,
f'{version}.{message}',
kwargs,
check_role,
internal,
)
async with self.pool.acquire() as connection: async with self.pool.acquire() as connection:
await connection.set_type_codec( await connection.set_type_codec(
'json', 'json',
@ -111,13 +126,10 @@ class CallDispatcher:
) )
risotto_context.connection = connection risotto_context.connection = connection
async with connection.transaction(): async with connection.transaction():
return await self.launch(version, return await self.launch(risotto_context,
message,
risotto_context,
check_role,
kwargs, kwargs,
function_objs, config_arguments,
internal, function_obj,
) )
except CallError as err: except CallError as err:
raise err raise err
@ -139,43 +151,80 @@ class CallDispatcher:
class PublishDispatcher: class PublishDispatcher:
async def register_remote(self) -> None:
print()
print(_('======== Registered remote event ========'))
self.listened_connection = await self.pool.acquire()
for version, messages in self.messages.items():
for message, message_infos in messages.items():
# event not emit locally
if message_infos['pattern'] == 'event' and 'functions' in message_infos and message_infos['functions']:
# module, submodule, submessage = message.split('.', 2)
# if f'{module}.{submodule}' not in self.injected_self:
uri = f'{version}.{message}'
print(f' - {uri}')
await self.listened_connection.add_listener(uri,
self.to_async_publish,
)
async def publish(self, async def publish(self,
version: str, version: str,
message: str, message: str,
old_risotto_context: Context, risotto_context: Context,
check_role: bool=False,
internal: bool=True,
**kwargs, **kwargs,
) -> None: ) -> None:
risotto_context = self.build_new_context(old_risotto_context, if version not in self.messages or message not in self.messages[version]:
raise ValueError(_(f'cannot find URI "{version}.{message}"'))
# publish to remote
remote_kw = dumps({'kwargs': kwargs,
'context': {'username': risotto_context.username,
'paths': risotto_context.paths,
}
})
# FIXME should be better :/
remote_kw = remote_kw.replace("'", "''")
await risotto_context.connection.execute(f'NOTIFY "{version}.{message}", \'{remote_kw}\'')
def to_async_publish(self,
con: 'asyncpg.connection.Connection',
pid: int,
uri: str,
payload: str,
) -> None:
version, message = uri.split('.', 1)
loop = get_event_loop()
remote_kw = loads(payload)
risotto_context = self.build_new_context(remote_kw['context'],
version, version,
message, message,
'event', 'event',
) )
try: callback = lambda: ensure_future(self._publish(version,
function_objs = self.messages[version][message].get('functions', [])
except KeyError:
raise ValueError(_(f'cannot find message {version}.{message}'))
# do not start a new database connection
if hasattr(old_risotto_context, 'connection'):
# publish to remove
remote_kw = dumps({'kwargs': kwargs,
'context': risotto_context.__dict__,
})
risotto_context.connection = old_risotto_context.connection
# FIXME should be better :/
remote_kw = remote_kw.replace("'", "''")
await risotto_context.connection.execute(f'NOTIFY "{version}.{message}", \'{remote_kw}\'')
return await self.launch(version,
message, message,
risotto_context, risotto_context,
check_role, **remote_kw['kwargs'],
))
loop.call_soon(callback)
async def _publish(self,
version: str,
message: str,
risotto_context: Context,
**kwargs,
) -> None:
config_arguments = await self.load_kwargs_to_config(risotto_context,
f'{version}.{message}',
kwargs, kwargs,
function_objs, False,
internal, False,
) )
for function_obj in self.messages[version][message]['functions']:
async with self.pool.acquire() as connection: async with self.pool.acquire() as connection:
try: try:
await self.check_message_type(risotto_context,
kwargs,
)
await connection.set_type_codec( await connection.set_type_codec(
'json', 'json',
encoder=dumps, encoder=dumps,
@ -184,13 +233,10 @@ class PublishDispatcher:
) )
risotto_context.connection = connection risotto_context.connection = connection
async with connection.transaction(): async with connection.transaction():
return await self.launch(version, await self.launch(risotto_context,
message,
risotto_context,
check_role,
kwargs, kwargs,
function_objs, config_arguments,
internal, function_obj,
) )
except CallError as err: except CallError as err:
pass pass
@ -211,14 +257,13 @@ class PublishDispatcher:
class Dispatcher(register.RegisterDispatcher, class Dispatcher(register.RegisterDispatcher,
Remote,
CallDispatcher, CallDispatcher,
PublishDispatcher): PublishDispatcher):
""" Manage message (call or publish) """ Manage message (call or publish)
so launch a function when a message is called so launch a function when a message is called
""" """
def build_new_context(self, def build_new_context(self,
old_risotto_context: Context, context: dict,
version: str, version: str,
message: str, message: str,
type: str, type: str,
@ -227,8 +272,8 @@ class Dispatcher(register.RegisterDispatcher,
""" """
uri = version + '.' + message uri = version + '.' + message
risotto_context = Context() risotto_context = Context()
risotto_context.username = old_risotto_context.username risotto_context.username = context['username']
risotto_context.paths = copy(old_risotto_context.paths) risotto_context.paths = copy(context['paths'])
risotto_context.paths.append(uri) risotto_context.paths.append(uri)
risotto_context.uri = uri risotto_context.uri = uri
risotto_context.type = type risotto_context.type = type
@ -304,7 +349,7 @@ class Dispatcher(register.RegisterDispatcher,
sql = ''' sql = '''
SELECT UserId SELECT UserId
FROM UserUser FROM UserUser
WHERE UserLogin = $1 WHERE Login = $1
''' '''
user_id = await connection.fetchval(sql, user_id = await connection.fetchval(sql,
user_login) user_login)
@ -342,29 +387,16 @@ class Dispatcher(register.RegisterDispatcher,
raise NotAllowedError(_(f'You ({user_login}) don\'t have any authorisation to access to "{uri}"')) raise NotAllowedError(_(f'You ({user_login}) don\'t have any authorisation to access to "{uri}"'))
async def launch(self, async def launch(self,
version: str,
message: str,
risotto_context: Context, risotto_context: Context,
check_role: bool,
kwargs: Dict, kwargs: Dict,
function_objs: List, config_arguments: dict,
internal: bool, function_obj: Callable,
) -> Optional[Dict]: ) -> Optional[Dict]:
await self.check_message_type(risotto_context, # so send the message
kwargs)
config_arguments = await self.load_kwargs_to_config(risotto_context,
f'{version}.{message}',
kwargs,
check_role,
internal,
)
# config is ok, so send the message
for function_obj in function_objs:
function = function_obj['function'] function = function_obj['function']
submodule_name = function_obj['module'] risotto_context.module = function_obj['module'].split('.', 1)[0]
function_name = function.__name__ function_name = function.__name__
risotto_context.module = submodule_name.split('.', 1)[0] info_msg = _(f"in function {function_obj['full_module_name']}.{function_name}")
info_msg = _(f'in module {submodule_name}.{function_name}')
# build argument for this function # build argument for this function
if risotto_context.type == 'rpc': if risotto_context.type == 'rpc':
kw = config_arguments kw = config_arguments
@ -381,12 +413,14 @@ class Dispatcher(register.RegisterDispatcher,
await self.valid_call_returns(risotto_context, await self.valid_call_returns(risotto_context,
function, function,
returns, returns,
kwargs) kwargs,
)
# log the success # log the success
await log.info_msg(risotto_context, await log.info_msg(risotto_context,
{'arguments': kwargs, {'arguments': kwargs,
'returns': returns}, 'returns': returns},
info_msg) info_msg,
)
# notification # notification
if function_obj.get('notification'): if function_obj.get('notification'):
notif_version, notif_message = function_obj['notification'].split('.', 1) notif_version, notif_message = function_obj['notification'].split('.', 1)
@ -398,7 +432,8 @@ class Dispatcher(register.RegisterDispatcher,
await self.publish(notif_version, await self.publish(notif_version,
notif_message, notif_message,
risotto_context, risotto_context,
**ret) **ret,
)
if risotto_context.type == 'rpc': if risotto_context.type == 'rpc':
return returns return returns

View File

@ -199,7 +199,8 @@ class RegisterDispatcher:
raise RegistrationError(_(f'the message {message} not exists')) raise RegistrationError(_(f'the message {message} not exists'))
# xxx submodule can only be register with v1.yyy.xxx..... message # xxx submodule can only be register with v1.yyy.xxx..... message
risotto_module_name, submodule_name = function.__module__.split('.')[-3:-1] full_module_name = function.__module__
risotto_module_name, submodule_name = full_module_name.split('.')[-3:-1]
module_name = risotto_module_name.split('_')[-1] module_name = risotto_module_name.split('_')[-1]
message_module, message_submodule, message_name = message.split('.', 2) message_module, message_submodule, message_name = message.split('.', 2)
if message_module not in self.risotto_modules: if message_module not in self.risotto_modules:
@ -224,6 +225,7 @@ class RegisterDispatcher:
register(version, register(version,
message, message,
f'{module_name}.{submodule_name}', f'{module_name}.{submodule_name}',
full_module_name,
function, function,
function_args, function_args,
notification, notification,
@ -233,11 +235,13 @@ class RegisterDispatcher:
version: str, version: str,
message: str, message: str,
module_name: str, module_name: str,
full_module_name: str,
function: Callable, function: Callable,
function_args: list, function_args: list,
notification: Optional[str], notification: Optional[str],
): ):
self.messages[version][message]['module'] = module_name self.messages[version][message]['module'] = module_name
self.messages[version][message]['full_module_name'] = full_module_name
self.messages[version][message]['function'] = function self.messages[version][message]['function'] = function
self.messages[version][message]['arguments'] = function_args self.messages[version][message]['arguments'] = function_args
if notification: if notification:
@ -247,6 +251,7 @@ class RegisterDispatcher:
version: str, version: str,
message: str, message: str,
module_name: str, module_name: str,
full_module_name: str,
function: Callable, function: Callable,
function_args: list, function_args: list,
notification: Optional[str], notification: Optional[str],
@ -255,8 +260,10 @@ class RegisterDispatcher:
self.messages[version][message]['functions'] = [] self.messages[version][message]['functions'] = []
dico = {'module': module_name, dico = {'module': module_name,
'full_module_name': full_module_name,
'function': function, 'function': function,
'arguments': function_args} 'arguments': function_args,
}
if notification and notification: if notification and notification:
dico['notification'] = notification dico['notification'] = notification
self.messages[version][message]['functions'].append(dico) self.messages[version][message]['functions'].append(dico)
@ -300,7 +307,7 @@ class RegisterDispatcher:
) )
if truncate: if truncate:
async with connection.transaction(): async with connection.transaction():
await connection.execute('TRUNCATE InfraServer, InfraSite, InfraZone, Log, ProviderDeployment, ProviderFactoryCluster, ProviderFactoryClusterNode, SettingApplicationservice, SettingApplicationServiceDependency, SettingRelease, SettingServer, SettingServermodel, SettingSource, UserRole, UserRoleURI, UserURI, UserUser, InfraServermodel, ProviderZone, ProviderServer, ProviderSource, ProviderApplicationservice ProviderServermodel') await connection.execute('TRUNCATE InfraServer, InfraSite, InfraZone, Log, ProviderDeployment, ProviderFactoryCluster, ProviderFactoryClusterNode, SettingApplicationservice, SettingApplicationServiceDependency, SettingRelease, SettingServer, SettingServermodel, SettingSource, UserRole, UserRoleURI, UserURI, UserUser, InfraServermodel, ProviderZone, ProviderServer, ProviderSource, ProviderApplicationservice, ProviderServermodel')
async with connection.transaction(): async with connection.transaction():
for submodule_name, module in self.injected_self.items(): for submodule_name, module in self.injected_self.items():
risotto_context = Context() risotto_context = Context()
@ -309,7 +316,7 @@ class RegisterDispatcher:
risotto_context.type = None risotto_context.type = None
risotto_context.connection = connection risotto_context.connection = connection
risotto_context.module = submodule_name.split('.', 1)[0] risotto_context.module = submodule_name.split('.', 1)[0]
info_msg = _(f'in module risotto_{submodule_name}.on_join') info_msg = _(f'in function risotto_{submodule_name}.on_join')
await log.info_msg(risotto_context, await log.info_msg(risotto_context,
None, None,
info_msg) info_msg)

View File

@ -1,42 +0,0 @@
from asyncio import get_event_loop, ensure_future
from json import loads
from .context import Context
from .config import get_config
from .utils import _
class Remote:
async def register_remote(self) -> None:
print()
print(_('======== Registered remote event ========'))
self.listened_connection = await self.pool.acquire()
for version, messages in self.messages.items():
for message, message_infos in messages.items():
# event not emit locally
if message_infos['pattern'] == 'event':
module, submodule, submessage = message.split('.', 2)
if f'{module}.{submodule}' not in self.injected_self:
uri = f'{version}.{message}'
print(f' - {uri}')
await self.listened_connection.add_listener(uri, self.to_async_publish)
def to_async_publish(self,
con: 'asyncpg.connection.Connection',
pid: int,
uri: str,
payload: str,
) -> None:
version, message = uri.split('.', 1)
loop = get_event_loop()
remote_kw = loads(payload)
context = Context()
for key, value in remote_kw['context'].items():
setattr(context, key, value)
callback = lambda: ensure_future(self.publish(version,
message,
context,
**remote_kw['kwargs'],
))
loop.call_soon(callback)