From e664dd617444c60453d98abd407bb49c6338b8c6 Mon Sep 17 00:00:00 2001 From: Emmanuel Garette Date: Sat, 12 Sep 2020 16:05:17 +0200 Subject: [PATCH] add remote support --- src/risotto/config.py | 135 ++++++++++++++++++++++++++++---------- src/risotto/controller.py | 53 +++++++-------- src/risotto/dispatcher.py | 64 ++++++++++-------- src/risotto/http.py | 12 ++-- src/risotto/register.py | 68 +++++++++++-------- src/risotto/remote.py | 87 ++++++++++-------------- 6 files changed, 242 insertions(+), 177 deletions(-) diff --git a/src/risotto/config.py b/src/risotto/config.py index a59bb21..bacd6b3 100644 --- a/src/risotto/config.py +++ b/src/risotto/config.py @@ -1,21 +1,81 @@ from os import environ +from os.path import isfile +from configobj import ConfigObj -CONFIGURATION_DIR = environ.get('CONFIGURATION_DIR', '/srv/risotto/configurations') -PROVIDER_FACTORY_CONFIG_DIR = environ.get('PROVIDER_FACTORY_CONFIG_DIR', '/srv/factory') -TMP_DIR = '/tmp' -DEFAULT_USER = environ.get('DEFAULT_USER', 'Anonymous') -RISOTTO_DB_NAME = environ.get('RISOTTO_DB_NAME', 'risotto') -RISOTTO_DB_PASSWORD = environ.get('RISOTTO_DB_PASSWORD', 'risotto') -RISOTTO_DB_USER = environ.get('RISOTTO_DB_USER', 'risotto') -TIRAMISU_DB_NAME = environ.get('TIRAMISU_DB_NAME', 'tiramisu') -TIRAMISU_DB_PASSWORD = environ.get('TIRAMISU_DB_PASSWORD', 'tiramisu') -TIRAMISU_DB_USER = environ.get('TIRAMISU_DB_USER', 'tiramisu') -DB_ADDRESS = environ.get('DB_ADDRESS', 'localhost') -MESSAGE_PATH = environ.get('MESSAGE_PATH', '/root/risotto-message/messages') -SQL_DIR = environ.get('SQL_DIR', './sql') -CACHE_ROOT_PATH = environ.get('CACHE_ROOT_PATH', '/var/cache/risotto') -SRV_SEED_PATH = environ.get('SRV_SEED_PATH', '/srv/seed') +CONFIG_FILE = environ.get('CONFIG_FILE', '/etc/risotto/risotto.conf') + + +if isfile(CONFIG_FILE): + config = ConfigObj(CONFIG_FILE) +else: + config = {} + + +if 'RISOTTO_PORT' in environ: + RISOTTO_PORT = environ['RISOTTO_PORT'] +else: + RISOTTO_PORT = config.get('RISOTTO_PORT', 8080) +if 'CONFIGURATION_DIR' in environ: + CONFIGURATION_DIR = environ['CONFIGURATION_DIR'] +else: + CONFIGURATION_DIR = config.get('CONFIGURATION_DIR', '/srv/risotto/configurations') +if 'PROVIDER_FACTORY_CONFIG_DIR' in environ: + PROVIDER_FACTORY_CONFIG_DIR = environ['PROVIDER_FACTORY_CONFIG_DIR'] +else: + PROVIDER_FACTORY_CONFIG_DIR = config.get('PROVIDER_FACTORY_CONFIG_DIR', '/srv/factory') +if 'DEFAULT_USER' in environ: + DEFAULT_USER = environ['DEFAULT_USER'] +else: + DEFAULT_USER = config.get('DEFAULT_USER', 'Anonymous') +if 'RISOTTO_DB_NAME' in environ: + RISOTTO_DB_NAME = environ['RISOTTO_DB_NAME'] +else: + RISOTTO_DB_NAME = config.get('RISOTTO_DB_NAME', 'risotto') +if 'RISOTTO_DB_PASSWORD' in environ: + RISOTTO_DB_PASSWORD = environ['RISOTTO_DB_PASSWORD'] +else: + RISOTTO_DB_PASSWORD = config.get('RISOTTO_DB_PASSWORD', 'risotto') +if 'RISOTTO_DB_USER' in environ: + RISOTTO_DB_USER = environ['RISOTTO_DB_USER'] +else: + RISOTTO_DB_USER = config.get('RISOTTO_DB_USER', 'risotto') +if 'TIRAMISU_DB_NAME' in environ: + TIRAMISU_DB_NAME = environ['TIRAMISU_DB_NAME'] +else: + TIRAMISU_DB_NAME = config.get('TIRAMISU_DB_NAME', 'tiramisu') +if 'TIRAMISU_DB_PASSWORD' in environ: + TIRAMISU_DB_PASSWORD = environ['TIRAMISU_DB_PASSWORD'] +else: + TIRAMISU_DB_PASSWORD = config.get('TIRAMISU_DB_PASSWORD', 'tiramisu') +if 'TIRAMISU_DB_USER' in environ: + TIRAMISU_DB_USER = environ['TIRAMISU_DB_USER'] +else: + TIRAMISU_DB_USER = config.get('TIRAMISU_DB_USER', 'tiramisu') +if 'DB_ADDRESS' in environ: + DB_ADDRESS = environ['DB_ADDRESS'] +else: + DB_ADDRESS = config.get('DB_ADDRESS', 'localhost') +if 'MESSAGE_PATH' in environ: + MESSAGE_PATH = environ['MESSAGE_PATH'] +else: + MESSAGE_PATH = config.get('MESSAGE_PATH', '/root/risotto-message/messages') +if 'SQL_DIR' in environ: + SQL_DIR = environ['SQL_DIR'] +else: + SQL_DIR = config.get('SQL_DIR', './sql') +if 'CACHE_ROOT_PATH' in environ: + CACHE_ROOT_PATH = environ['CACHE_ROOT_PATH'] +else: + CACHE_ROOT_PATH = config.get('CACHE_ROOT_PATH', '/var/cache/risotto') +if 'SRV_SEED_PATH' in environ: + SRV_SEED_PATH = environ['SRV_SEED_PATH'] +else: + SRV_SEED_PATH = config.get('SRV_SEED_PATH', '/srv/seed') +if 'TMP_DIR' in environ: + TMP_DIR = environ['TMP_DIR'] +else: + TMP_DIR = config.get('TMP_DIR', '/tmp') def dsn_factory(database, user, password, address=DB_ADDRESS): @@ -23,24 +83,29 @@ def dsn_factory(database, user, password, address=DB_ADDRESS): return f'postgres:///{database}?host={mangled_address}/&user={user}&password={password}' +_config = {'database': {'dsn': dsn_factory(RISOTTO_DB_NAME, RISOTTO_DB_USER, RISOTTO_DB_PASSWORD), + 'tiramisu_dsn': dsn_factory(TIRAMISU_DB_NAME, TIRAMISU_DB_USER, TIRAMISU_DB_PASSWORD), + }, + 'http_server': {'port': RISOTTO_PORT, + 'default_user': DEFAULT_USER}, + 'global': {'message_root_path': MESSAGE_PATH, + 'configurations_dir': CONFIGURATION_DIR, + 'debug': True, + 'internal_user': '_internal', + 'check_role': True, + 'admin_user': DEFAULT_USER, + 'sql_dir': SQL_DIR, + 'tmp_dir': TMP_DIR, + }, + 'cache': {'root_path': CACHE_ROOT_PATH}, + 'servermodel': {'internal_source_path': SRV_SEED_PATH, + 'internal_source': 'internal'}, + 'submodule': {'allow_insecure_https': False, + 'pki': '192.168.56.112'}, + 'provider': {'factory_configuration_dir': PROVIDER_FACTORY_CONFIG_DIR, + 'factory_configuration_filename': 'infra.json'}, + } + + def get_config(): - return {'database': {'dsn': dsn_factory(RISOTTO_DB_NAME, RISOTTO_DB_USER, RISOTTO_DB_PASSWORD), - 'tiramisu_dsn': dsn_factory(TIRAMISU_DB_NAME, TIRAMISU_DB_USER, TIRAMISU_DB_PASSWORD), - }, - 'http_server': {'port': 8080, - 'default_user': DEFAULT_USER}, - 'global': {'message_root_path': MESSAGE_PATH, - 'configurations_dir': CONFIGURATION_DIR, - 'debug': True, - 'internal_user': 'internal', - 'check_role': True, - 'admin_user': DEFAULT_USER, - 'sql_dir': SQL_DIR}, - 'cache': {'root_path': CACHE_ROOT_PATH}, - 'servermodel': {'internal_source_path': SRV_SEED_PATH, - 'internal_source': 'internal'}, - 'submodule': {'allow_insecure_https': False, - 'pki': '192.168.56.112'}, - 'provider': {'factory_configuration_dir': PROVIDER_FACTORY_CONFIG_DIR, - 'factory_configuration_filename': 'infra.json'}, - } + return _config diff --git a/src/risotto/controller.py b/src/risotto/controller.py index cc0a67c..c4b3296 100644 --- a/src/risotto/controller.py +++ b/src/risotto/controller.py @@ -1,8 +1,5 @@ -from .config import get_config from .dispatcher import dispatcher from .context import Context -from .remote import remote -from . import services from .utils import _ @@ -10,50 +7,48 @@ class Controller: """Common controller used to add a service in Risotto """ def __init__(self, - test: bool): - self.risotto_modules = services.get_services_list() + test: bool, + ): + pass async def call(self, uri: str, risotto_context: Context, *args, - **kwargs): + **kwargs, + ): """ a wrapper to dispatcher's call""" - version, module, message = uri.split('.', 2) - uri = module + '.' + message if args: raise ValueError(_(f'the URI "{uri}" can only be called with keyword arguments')) - if module not in self.risotto_modules: - return await remote.remote_call(module, - version, - message, - kwargs) + current_uri = risotto_context.paths[-1] + current_module = risotto_context.module + version, message = uri.split('.', 1) + module = message.split('.', 1)[0] + if current_module != module: + raise ValueError(_(f'cannot call to external module ("{module}") to the URI "{uri}" from "{current_module}"')) return await dispatcher.call(version, - uri, + message, risotto_context, - **kwargs) + **kwargs, + ) async def publish(self, uri: str, risotto_context: Context, - *args, - **kwargs): + *args, + **kwargs, + ): """ a wrapper to dispatcher's publish""" - version, module, submessage = uri.split('.', 2) version, message = uri.split('.', 1) if args: raise ValueError(_(f'the URI "{uri}" can only be published with keyword arguments')) - if module not in self.risotto_modules: - await remote.remote_call(module, - version, - submessage, - kwargs) - else: - await dispatcher.publish(version, - message, - risotto_context, - **kwargs) + await dispatcher.publish(version, + message, + risotto_context, + **kwargs, + ) async def on_join(self, - risotto_context): + risotto_context, + ): pass diff --git a/src/risotto/dispatcher.py b/src/risotto/dispatcher.py index d110063..7bc464d 100644 --- a/src/risotto/dispatcher.py +++ b/src/risotto/dispatcher.py @@ -15,8 +15,7 @@ from .logger import log from .config import get_config from .context import Context from . import register -#from .remote import Remote -import asyncpg +from .remote import Remote class CallDispatcher: @@ -83,7 +82,8 @@ class CallDispatcher: risotto_context = self.build_new_context(old_risotto_context, version, message, - 'rpc') + 'rpc', + ) if version not in self.messages: raise CallError(_(f'cannot find version of message "{version}"')) if message not in self.messages[version]: @@ -150,14 +150,20 @@ class PublishDispatcher: risotto_context = self.build_new_context(old_risotto_context, version, message, - 'event') + 'event', + ) try: function_objs = self.messages[version][message].get('functions', []) except KeyError: raise ValueError(_(f'cannot find message {version}.{message}')) # do not start a new database connection if hasattr(old_risotto_context, 'connection'): + # publish to remove + remote_kw = dumps({'kwargs': kwargs, + 'context': risotto_context.__dict__, + }) risotto_context.connection = old_risotto_context.connection + await risotto_context.connection.execute(f'NOTIFY "{version}.{message}", \'{remote_kw}\'') return await self.launch(version, message, risotto_context, @@ -166,8 +172,8 @@ class PublishDispatcher: function_objs, internal, ) - try: - async with self.pool.acquire() as connection: + async with self.pool.acquire() as connection: + try: await connection.set_type_codec( 'json', encoder=dumps, @@ -184,27 +190,26 @@ class PublishDispatcher: function_objs, internal, ) - except CallError as err: - raise err - except Exception as err: - # if there is a problem with arguments, just send an error and do nothing - if get_config()['global']['debug']: - print_exc() - async with self.pool.acquire() as connection: - await connection.set_type_codec( - 'json', - encoder=dumps, - decoder=loads, - schema='pg_catalog' - ) - risotto_context.connection = connection - async with connection.transaction(): - await log.error_msg(risotto_context, kwargs, err) - raise err + except CallError as err: + pass + except Exception as err: + # if there is a problem with arguments, log and do nothing + if get_config()['global']['debug']: + print_exc() + async with self.pool.acquire() as connection: + await connection.set_type_codec( + 'json', + encoder=dumps, + decoder=loads, + schema='pg_catalog' + ) + risotto_context.connection = connection + async with connection.transaction(): + await log.error_msg(risotto_context, kwargs, err) class Dispatcher(register.RegisterDispatcher, -# Remote, + Remote, CallDispatcher, PublishDispatcher): """ Manage message (call or publish) @@ -214,7 +219,8 @@ class Dispatcher(register.RegisterDispatcher, old_risotto_context: Context, version: str, message: str, - type: str): + type: str, + ) -> Context: """ This is a new call or a new publish, so create a new context """ uri = version + '.' + message @@ -230,7 +236,8 @@ class Dispatcher(register.RegisterDispatcher, async def check_message_type(self, risotto_context: Context, - kwargs: Dict): + kwargs: Dict, + ) -> None: if self.messages[risotto_context.version][risotto_context.message]['pattern'] != risotto_context.type: msg = _(f'{risotto_context.uri} is not a {risotto_context.type} message') await log.error_msg(risotto_context, kwargs, msg) @@ -352,9 +359,10 @@ class Dispatcher(register.RegisterDispatcher, # config is ok, so send the message for function_obj in function_objs: function = function_obj['function'] - module_name = function.__module__.split('.')[-2] + submodule_name = function_obj['module'] function_name = function.__name__ - info_msg = _(f'in module {module_name}.{function_name}') + risotto_context.module = submodule_name.split('.', 1)[0] + info_msg = _(f'in module {submodule_name}.{function_name}') # build argument for this function if risotto_context.type == 'rpc': kw = config_arguments diff --git a/src/risotto/http.py b/src/risotto/http.py index 10ea270..dae48d8 100644 --- a/src/risotto/http.py +++ b/src/risotto/http.py @@ -51,8 +51,9 @@ class extra_route_handler: function_name = cls.function.__module__ # if not 'api' function if function_name != 'risotto.http': - module_name = function_name.split('.')[-2] - kwargs['self'] = dispatcher.injected_self[module_name] + risotto_module_name, submodule_name = function_name.split('.', 2)[:-1] + module_name = risotto_module_name.split('_')[-1] + kwargs['self'] = dispatcher.injected_self[module_name + '.' + submodule_name] try: returns = await cls.function(**kwargs) except NotAllowedError as err: @@ -141,9 +142,9 @@ async def get_app(loop): versions.append(version) print() print(_('======== Registered messages ========')) - for message in messages: + for message, message_infos in messages.items(): web_message = f'/api/{version}/{message}' - pattern = dispatcher.messages[version][message]['pattern'] + pattern = message_infos['pattern'] print(f' - {web_message} ({pattern})') routes.append(post(web_message, handle)) print() @@ -168,9 +169,10 @@ async def get_app(loop): extra_handler = type(path, (extra_route_handler,), extra) routes.append(get(path, extra_handler)) print(f' - {path} (http_get)') - print() del extra_routes app.router.add_routes(routes) + await dispatcher.register_remote() + print() await dispatcher.on_join() return await loop.create_server(app.make_handler(), '*', get_config()['http_server']['port']) diff --git a/src/risotto/register.py b/src/risotto/register.py index cd7743c..20ed685 100644 --- a/src/risotto/register.py +++ b/src/risotto/register.py @@ -4,8 +4,9 @@ except: from tiramisu import Config from inspect import signature from typing import Callable, Optional, List -import asyncpg +from asyncpg import create_pool from json import dumps, loads +from pkg_resources import iter_entry_points import risotto from .utils import _ from .error import RegistrationError @@ -13,7 +14,7 @@ from .message import get_messages from .context import Context from .config import get_config from .logger import log -from pkg_resources import iter_entry_points + class Services(): services = {} @@ -45,7 +46,7 @@ class Services(): ) -> List[str]: if not self.modules_loaded: self.load_modules(limit_services=limit_services) - return [(m, getattr(self, m)) for s in self.services.values() for m in s] + return [(module + '.' + submodule, getattr(self, submodule)) for module, submodules in self.services.items() for submodule in submodules] def get_services_list(self): return self.services.keys() @@ -59,10 +60,11 @@ class Services(): test: bool=False, limit_services: Optional[List[str]]=None, ): - for module_name, module in self.get_modules(limit_services=limit_services): - dispatcher.set_module(module_name, + for submodule_name, module in self.get_modules(limit_services=limit_services): + dispatcher.set_module(submodule_name, module, - test) + test, + ) if validate: dispatcher.validate() @@ -73,7 +75,8 @@ setattr(risotto, 'services', services) def register(uris: str, - notification: str=None): + notification: str=None, + ) -> None: """ Decorator to register function to the dispatcher """ if not isinstance(uris, list): @@ -185,7 +188,8 @@ class RegisterDispatcher: version: str, message: str, notification: str, - function: Callable): + function: Callable, + ): """ register a function to an URI URI is a message """ @@ -194,14 +198,16 @@ class RegisterDispatcher: if message not in self.messages[version]: raise RegistrationError(_(f'the message {message} not exists')) - # xxx module can only be register with v1.xxxx..... message - module_name = function.__module__.split('.')[-2] - message_namespace = message.split('.', 1)[0] - message_risotto_module, message_namespace, message_name = message.split('.', 2) - if message_risotto_module not in self.risotto_modules: + # xxx submodule can only be register with v1.yyy.xxx..... message + risotto_module_name, submodule_name = function.__module__.split('.')[-3:-1] + module_name = risotto_module_name.split('_')[-1] + message_module, message_submodule, message_name = message.split('.', 2) + if message_module not in self.risotto_modules: raise RegistrationError(_(f'cannot registered the "{message}" is not "{self.risotto_modules}"')) - if self.messages[version][message]['pattern'] == 'rpc' and message_namespace != module_name: - raise RegistrationError(_(f'cannot registered the "{message}" message in module "{module_name}"')) + if self.messages[version][message]['pattern'] == 'rpc' and \ + module_name != message_module and \ + message_submodule != submodule_name: + raise RegistrationError(_(f'cannot registered the "{message}" message in submodule "{module_name}.{submodule_name}"')) # True if first argument is the risotto_context function_args = self.get_function_args(function) @@ -217,10 +223,11 @@ class RegisterDispatcher: register = self.register_event register(version, message, - module_name, + f'{module_name}.{submodule_name}', function, function_args, - notification) + notification, + ) def register_rpc(self, version: str, @@ -228,7 +235,8 @@ class RegisterDispatcher: module_name: str, function: Callable, function_args: list, - notification: Optional[str]): + notification: Optional[str], + ): self.messages[version][message]['module'] = module_name self.messages[version][message]['function'] = function self.messages[version][message]['arguments'] = function_args @@ -241,7 +249,8 @@ class RegisterDispatcher: module_name: str, function: Callable, function_args: list, - notification: Optional[str]): + notification: Optional[str], + ): if 'functions' not in self.messages[version][message]: self.messages[version][message]['functions'] = [] @@ -252,13 +261,17 @@ class RegisterDispatcher: dico['notification'] = notification self.messages[version][message]['functions'].append(dico) - def set_module(self, module_name, module, test): + def set_module(self, + submodule_name, + module, + test, + ): """ register and instanciate a new module """ try: - self.injected_self[module_name] = module.Risotto(test) + self.injected_self[submodule_name] = module.Risotto(test) except AttributeError as err: - raise RegistrationError(_(f'unable to register the module {module_name}, this module must have Risotto class')) + raise RegistrationError(_(f'unable to register the module {submodule_name}, this module must have Risotto class')) def validate(self): """ check if all messages have a function @@ -287,15 +300,16 @@ class RegisterDispatcher: ) if truncate: async with connection.transaction(): - await connection.execute('TRUNCATE applicationservicedependency, deployment, factoryclusternode, factorycluster, log, release, userrole, risottouser, roleuri, infraserver, settingserver, servermodel, site, source, uri, userrole, zone, applicationservice') + await connection.execute('TRUNCATE InfraServer, InfraSite, InfraZone, Log, ProviderDeployment, ProviderFactoryCluster, ProviderFactoryClusterNode, SettingApplicationservice, SettingApplicationServiceDependency, SettingRelease, SettingServer, SettingServermodel, SettingSource, UserRole, UserRoleURI, UserURI, UserUser, InfraServermodel, ProviderZone, ProviderServer, ProviderServermodel') async with connection.transaction(): - for module_name, module in self.injected_self.items(): + for submodule_name, module in self.injected_self.items(): risotto_context = Context() risotto_context.username = internal_user - risotto_context.paths.append(f'{module_name}.on_join') + risotto_context.paths.append(f'internal.{submodule_name}.on_join') risotto_context.type = None risotto_context.connection = connection - info_msg = _(f'in module {module_name}.on_join') + risotto_context.module = submodule_name.split('.', 1)[0] + info_msg = _(f'in module risotto_{submodule_name}.on_join') await log.info_msg(risotto_context, None, info_msg) @@ -304,7 +318,7 @@ class RegisterDispatcher: async def load(self): # valid function's arguments db_conf = get_config()['database']['dsn'] - self.pool = await asyncpg.create_pool(db_conf) + self.pool = await create_pool(db_conf) async with self.pool.acquire() as connection: async with connection.transaction(): for version, messages in self.messages.items(): diff --git a/src/risotto/remote.py b/src/risotto/remote.py index a3c2aad..dc5d358 100644 --- a/src/risotto/remote.py +++ b/src/risotto/remote.py @@ -1,61 +1,42 @@ -from aiohttp import ClientSession -from requests import get, post -from json import dumps -#from tiramisu_api import Config +from asyncio import get_event_loop, ensure_future +from json import loads +from .context import Context from .config import get_config from .utils import _ -# -# -# ALLOW_INSECURE_HTTPS = get_config()['module']['allow_insecure_https'] class Remote: - submodules = {} + async def register_remote(self) -> None: + print() + print(_('======== Registered remote event ========')) + self.listened_connection = await self.pool.acquire() + for version, messages in self.messages.items(): + for message, message_infos in messages.items(): + # event not emit locally + if message_infos['pattern'] == 'event': + module, submodule, submessage = message.split('.', 2) + if f'{module}.{submodule}' not in self.injected_self: + uri = f'{version}.{message}' + print(f' - {uri}') + await self.listened_connection.add_listener(uri, self.to_async_publish) - async def _get_config(self, - module: str, - url: str) -> None: - if module not in self.submodules: - session = ClientSession() - async with session.get(url) as resp: - if resp.status != 200: - try: - json = await resp.json() - err = json['error']['kwargs']['reason'] - except: - err = await resp.text() - raise Exception(err) - json = await resp.json() - self.submodules[module] = json - return Config(self.submodules[module]) - - async def remote_call(self, - module: str, - version: str, - submessage: str, - payload) -> dict: - try: - domain_name = get_config()['module'][module] - except KeyError: - raise ValueError(_(f'cannot find information of remote module "{module}" to access to "{version}.{module}.{submessage}"')) - remote_url = f'http://{domain_name}:8080/api/{version}' - message_url = f'{remote_url}/{submessage}' - - config = await self._get_config(module, - remote_url) - for key, value in payload.items(): - path = submessage + '.' + key - config.option(path).value.set(value) - session = ClientSession() - async with session.post(message_url, data=dumps(payload)) as resp: - response = await resp.json() - if 'error' in response: - if 'reason' in response['error']['kwargs']: - raise Exception("{}".format(response['error']['kwargs']['reason'])) - raise Exception('erreur inconnue') - return response['response'] - - -remote = Remote() + def to_async_publish(self, + con: 'asyncpg.connection.Connection', + pid: int, + uri: str, + payload: str, + ) -> None: + version, message = uri.split('.', 1) + loop = get_event_loop() + remote_kw = loads(payload) + context = Context() + for key, value in remote_kw['context'].items(): + setattr(context, key, value) + callback = lambda: ensure_future(self.publish(version, + message, + context, + **remote_kw['kwargs'], + )) + loop.call_soon(callback)