diff --git a/src/risotto/controller.py b/src/risotto/controller.py index c4b3296..779b344 100644 --- a/src/risotto/controller.py +++ b/src/risotto/controller.py @@ -39,9 +39,9 @@ class Controller: **kwargs, ): """ a wrapper to dispatcher's publish""" - version, message = uri.split('.', 1) if args: raise ValueError(_(f'the URI "{uri}" can only be published with keyword arguments')) + version, message = uri.split('.', 1) await dispatcher.publish(version, message, risotto_context, diff --git a/src/risotto/dispatcher.py b/src/risotto/dispatcher.py index 08b2343..bff2ff5 100644 --- a/src/risotto/dispatcher.py +++ b/src/risotto/dispatcher.py @@ -4,6 +4,7 @@ try: except: from tiramisu import Config from tiramisu.error import ValueOptionError +from asyncio import get_event_loop, ensure_future from traceback import print_exc from copy import copy from typing import Dict, Callable, List, Optional @@ -15,7 +16,6 @@ from .logger import log from .config import get_config from .context import Context from . import register -from .remote import Remote class CallDispatcher: @@ -79,7 +79,7 @@ class CallDispatcher: """ execute the function associate with specified uri arguments are validate before """ - risotto_context = self.build_new_context(old_risotto_context, + risotto_context = self.build_new_context(old_risotto_context.__dict__, version, message, 'rpc', @@ -88,20 +88,35 @@ class CallDispatcher: raise CallError(_(f'cannot find version of message "{version}"')) if message not in self.messages[version]: raise CallError(_(f'cannot find message "{version}.{message}"')) - function_objs = [self.messages[version][message]] + function_obj = self.messages[version][message] # do not start a new database connection if hasattr(old_risotto_context, 'connection'): risotto_context.connection = old_risotto_context.connection - return await self.launch(version, - message, - risotto_context, - check_role, + await self.check_message_type(risotto_context, + kwargs, + ) + config_arguments = await self.load_kwargs_to_config(risotto_context, + f'{version}.{message}', + kwargs, + check_role, + internal, + ) + return await self.launch(risotto_context, kwargs, - function_objs, - internal, + config_arguments, + function_obj, ) else: try: + await self.check_message_type(risotto_context, + kwargs, + ) + config_arguments = await self.load_kwargs_to_config(risotto_context, + f'{version}.{message}', + kwargs, + check_role, + internal, + ) async with self.pool.acquire() as connection: await connection.set_type_codec( 'json', @@ -111,13 +126,10 @@ class CallDispatcher: ) risotto_context.connection = connection async with connection.transaction(): - return await self.launch(version, - message, - risotto_context, - check_role, + return await self.launch(risotto_context, kwargs, - function_objs, - internal, + config_arguments, + function_obj, ) except CallError as err: raise err @@ -139,66 +151,81 @@ class CallDispatcher: class PublishDispatcher: + async def register_remote(self) -> None: + print() + print(_('======== Registered remote event ========')) + self.listened_connection = await self.pool.acquire() + for version, messages in self.messages.items(): + for message, message_infos in messages.items(): + # event not emit locally + if message_infos['pattern'] == 'event' and 'functions' in message_infos and message_infos['functions']: +# module, submodule, submessage = message.split('.', 2) +# if f'{module}.{submodule}' not in self.injected_self: + uri = f'{version}.{message}' + print(f' - {uri}') + await self.listened_connection.add_listener(uri, + self.to_async_publish, + ) + async def publish(self, version: str, message: str, - old_risotto_context: Context, - check_role: bool=False, - internal: bool=True, + risotto_context: Context, **kwargs, ) -> None: - risotto_context = self.build_new_context(old_risotto_context, + if version not in self.messages or message not in self.messages[version]: + raise ValueError(_(f'cannot find URI "{version}.{message}"')) + + # publish to remote + remote_kw = dumps({'kwargs': kwargs, + 'context': {'username': risotto_context.username, + 'paths': risotto_context.paths, + } + }) + # FIXME should be better :/ + remote_kw = remote_kw.replace("'", "''") + await risotto_context.connection.execute(f'NOTIFY "{version}.{message}", \'{remote_kw}\'') + + def to_async_publish(self, + con: 'asyncpg.connection.Connection', + pid: int, + uri: str, + payload: str, + ) -> None: + version, message = uri.split('.', 1) + loop = get_event_loop() + remote_kw = loads(payload) + risotto_context = self.build_new_context(remote_kw['context'], version, message, 'event', ) - try: - function_objs = self.messages[version][message].get('functions', []) - except KeyError: - raise ValueError(_(f'cannot find message {version}.{message}')) - # do not start a new database connection - if hasattr(old_risotto_context, 'connection'): - # publish to remove - remote_kw = dumps({'kwargs': kwargs, - 'context': risotto_context.__dict__, - }) - risotto_context.connection = old_risotto_context.connection - # FIXME should be better :/ - remote_kw = remote_kw.replace("'", "''") - await risotto_context.connection.execute(f'NOTIFY "{version}.{message}", \'{remote_kw}\'') - return await self.launch(version, - message, - risotto_context, - check_role, - kwargs, - function_objs, - internal, - ) - async with self.pool.acquire() as connection: - try: - await connection.set_type_codec( - 'json', - encoder=dumps, - decoder=loads, - schema='pg_catalog' - ) - risotto_context.connection = connection - async with connection.transaction(): - return await self.launch(version, - message, - risotto_context, - check_role, - kwargs, - function_objs, - internal, - ) - except CallError as err: - pass - except Exception as err: - # if there is a problem with arguments, log and do nothing - if get_config()['global']['debug']: - print_exc() - async with self.pool.acquire() as connection: + callback = lambda: ensure_future(self._publish(version, + message, + risotto_context, + **remote_kw['kwargs'], + )) + loop.call_soon(callback) + + async def _publish(self, + version: str, + message: str, + risotto_context: Context, + **kwargs, + ) -> None: + config_arguments = await self.load_kwargs_to_config(risotto_context, + f'{version}.{message}', + kwargs, + False, + False, + ) + for function_obj in self.messages[version][message]['functions']: + print('======', function_obj['function'].__name__) + async with self.pool.acquire() as connection: + try: + await self.check_message_type(risotto_context, + kwargs, + ) await connection.set_type_codec( 'json', encoder=dumps, @@ -207,18 +234,37 @@ class PublishDispatcher: ) risotto_context.connection = connection async with connection.transaction(): - await log.error_msg(risotto_context, kwargs, err) + await self.launch(risotto_context, + kwargs, + config_arguments, + function_obj, + ) + except CallError as err: + pass + except Exception as err: + # if there is a problem with arguments, log and do nothing + if get_config()['global']['debug']: + print_exc() + async with self.pool.acquire() as connection: + await connection.set_type_codec( + 'json', + encoder=dumps, + decoder=loads, + schema='pg_catalog' + ) + risotto_context.connection = connection + async with connection.transaction(): + await log.error_msg(risotto_context, kwargs, err) class Dispatcher(register.RegisterDispatcher, - Remote, CallDispatcher, PublishDispatcher): """ Manage message (call or publish) so launch a function when a message is called """ def build_new_context(self, - old_risotto_context: Context, + context: dict, version: str, message: str, type: str, @@ -227,8 +273,8 @@ class Dispatcher(register.RegisterDispatcher, """ uri = version + '.' + message risotto_context = Context() - risotto_context.username = old_risotto_context.username - risotto_context.paths = copy(old_risotto_context.paths) + risotto_context.username = context['username'] + risotto_context.paths = copy(context['paths']) risotto_context.paths.append(uri) risotto_context.uri = uri risotto_context.type = type @@ -342,65 +388,56 @@ class Dispatcher(register.RegisterDispatcher, raise NotAllowedError(_(f'You ({user_login}) don\'t have any authorisation to access to "{uri}"')) async def launch(self, - version: str, - message: str, risotto_context: Context, - check_role: bool, kwargs: Dict, - function_objs: List, - internal: bool, + config_arguments: dict, + function_obj: Callable, ) -> Optional[Dict]: - await self.check_message_type(risotto_context, - kwargs) - config_arguments = await self.load_kwargs_to_config(risotto_context, - f'{version}.{message}', - kwargs, - check_role, - internal, - ) - # config is ok, so send the message - for function_obj in function_objs: - function = function_obj['function'] - submodule_name = function_obj['module'] - function_name = function.__name__ - risotto_context.module = submodule_name.split('.', 1)[0] - info_msg = _(f'in module {submodule_name}.{function_name}') - # build argument for this function - if risotto_context.type == 'rpc': - kw = config_arguments - else: - kw = {} - for key, value in config_arguments.items(): - if key in function_obj['arguments']: - kw[key] = value + # so send the message + function = function_obj['function'] + submodule_name = function_obj['module'] + function_name = function.__name__ + risotto_context.module = submodule_name.split('.', 1)[0] + info_msg = _(f'in module {submodule_name}.{function_name}') + # build argument for this function + if risotto_context.type == 'rpc': + kw = config_arguments + else: + kw = {} + for key, value in config_arguments.items(): + if key in function_obj['arguments']: + kw[key] = value - kw['risotto_context'] = risotto_context - returns = await function(self.injected_self[function_obj['module']], **kw) - if risotto_context.type == 'rpc': - # valid returns - await self.valid_call_returns(risotto_context, - function, - returns, - kwargs) - # log the success - await log.info_msg(risotto_context, - {'arguments': kwargs, - 'returns': returns}, - info_msg) - # notification - if function_obj.get('notification'): - notif_version, notif_message = function_obj['notification'].split('.', 1) - if not isinstance(returns, list): - send_returns = [returns] - else: - send_returns = returns - for ret in send_returns: - await self.publish(notif_version, - notif_message, - risotto_context, - **ret) - if risotto_context.type == 'rpc': - return returns + kw['risotto_context'] = risotto_context + returns = await function(self.injected_self[function_obj['module']], **kw) + if risotto_context.type == 'rpc': + # valid returns + await self.valid_call_returns(risotto_context, + function, + returns, + kwargs, + ) + # log the success + await log.info_msg(risotto_context, + {'arguments': kwargs, + 'returns': returns}, + info_msg, + ) + # notification + if function_obj.get('notification'): + notif_version, notif_message = function_obj['notification'].split('.', 1) + if not isinstance(returns, list): + send_returns = [returns] + else: + send_returns = returns + for ret in send_returns: + await self.publish(notif_version, + notif_message, + risotto_context, + **ret, + ) + if risotto_context.type == 'rpc': + return returns dispatcher = Dispatcher() diff --git a/src/risotto/register.py b/src/risotto/register.py index c2db5cb..c03275e 100644 --- a/src/risotto/register.py +++ b/src/risotto/register.py @@ -300,7 +300,7 @@ class RegisterDispatcher: ) if truncate: async with connection.transaction(): - await connection.execute('TRUNCATE InfraServer, InfraSite, InfraZone, Log, ProviderDeployment, ProviderFactoryCluster, ProviderFactoryClusterNode, SettingApplicationservice, SettingApplicationServiceDependency, SettingRelease, SettingServer, SettingServermodel, SettingSource, UserRole, UserRoleURI, UserURI, UserUser, InfraServermodel, ProviderZone, ProviderServer, ProviderSource, ProviderApplicationservice ProviderServermodel') + await connection.execute('TRUNCATE InfraServer, InfraSite, InfraZone, Log, ProviderDeployment, ProviderFactoryCluster, ProviderFactoryClusterNode, SettingApplicationservice, SettingApplicationServiceDependency, SettingRelease, SettingServer, SettingServermodel, SettingSource, UserRole, UserRoleURI, UserURI, UserUser, InfraServermodel, ProviderZone, ProviderServer, ProviderSource, ProviderApplicationservice, ProviderServermodel') async with connection.transaction(): for submodule_name, module in self.injected_self.items(): risotto_context = Context() diff --git a/src/risotto/remote.py b/src/risotto/remote.py deleted file mode 100644 index dc5d358..0000000 --- a/src/risotto/remote.py +++ /dev/null @@ -1,42 +0,0 @@ -from asyncio import get_event_loop, ensure_future -from json import loads - - -from .context import Context -from .config import get_config -from .utils import _ - - -class Remote: - async def register_remote(self) -> None: - print() - print(_('======== Registered remote event ========')) - self.listened_connection = await self.pool.acquire() - for version, messages in self.messages.items(): - for message, message_infos in messages.items(): - # event not emit locally - if message_infos['pattern'] == 'event': - module, submodule, submessage = message.split('.', 2) - if f'{module}.{submodule}' not in self.injected_self: - uri = f'{version}.{message}' - print(f' - {uri}') - await self.listened_connection.add_listener(uri, self.to_async_publish) - - def to_async_publish(self, - con: 'asyncpg.connection.Connection', - pid: int, - uri: str, - payload: str, - ) -> None: - version, message = uri.split('.', 1) - loop = get_event_loop() - remote_kw = loads(payload) - context = Context() - for key, value in remote_kw['context'].items(): - setattr(context, key, value) - callback = lambda: ensure_future(self.publish(version, - message, - context, - **remote_kw['kwargs'], - )) - loop.call_soon(callback)