From 0c642564d73a2088c62a8071a490722fbceca120 Mon Sep 17 00:00:00 2001 From: Emmanuel Garette Date: Thu, 28 Nov 2019 16:51:56 +0100 Subject: [PATCH] add event messages --- .../config.configuration.server.updated.yml | 0 src/risotto/config.py | 2 +- src/risotto/controller.py | 25 +- src/risotto/dispatcher.py | 265 ++++++++++++++---- src/risotto/http.py | 17 +- src/risotto/logger.py | 42 ++- src/risotto/message/message.py | 3 +- src/risotto/services/config/config.py | 7 +- 8 files changed, 285 insertions(+), 76 deletions(-) rename messages/v1/messages/{old => }/config.configuration.server.updated.yml (100%) diff --git a/messages/v1/messages/old/config.configuration.server.updated.yml b/messages/v1/messages/config.configuration.server.updated.yml similarity index 100% rename from messages/v1/messages/old/config.configuration.server.updated.yml rename to messages/v1/messages/config.configuration.server.updated.yml diff --git a/src/risotto/config.py b/src/risotto/config.py index bd3055d..2088b3b 100644 --- a/src/risotto/config.py +++ b/src/risotto/config.py @@ -1,3 +1,3 @@ HTTP_PORT = 8080 MESSAGE_ROOT_PATH = 'messages' -DEBUG = True +DEBUG = False diff --git a/src/risotto/controller.py b/src/risotto/controller.py index adc87dd..5af4561 100644 --- a/src/risotto/controller.py +++ b/src/risotto/controller.py @@ -1,7 +1,28 @@ from .dispatcher import dispatcher +from .context import Context class Controller: - async def call(self, uri, risotto_context, **kwargs): + """Common controller used to add a service in Risotto + """ + async def call(self, + uri: str, + risotto_context: Context, + **kwargs): + """ a wrapper to dispatcher's call""" version, uri = uri.split('.', 1) - return await dispatcher.call(version, uri, risotto_context, **kwargs) + return await dispatcher.call(version, + uri, + risotto_context, + **kwargs) + + async def publish(self, + uri: str, + risotto_context: Context, + **kwargs): + """ a wrapper to dispatcher's publish""" + version, uri = uri.split('.', 1) + await dispatcher.publish(version, + uri, + risotto_context, + **kwargs) diff --git a/src/risotto/dispatcher.py b/src/risotto/dispatcher.py index 448e12f..1e88c52 100644 --- a/src/risotto/dispatcher.py +++ b/src/risotto/dispatcher.py @@ -2,6 +2,7 @@ from tiramisu import Config from inspect import signature from traceback import print_exc from copy import copy +from typing import Dict from .utils import undefined, _ from .error import RegistrationError, CallError, NotAllowedError @@ -13,25 +14,15 @@ from .context import Context def register(uri: str, notification: str=undefined): + """ Decorator to register function to the dispatcher + """ version, uri = uri.split('.', 1) def decorator(function): dispatcher.set_function(version, uri, function) return decorator -class URI: - def __init__(self, ): - self.functions = {} - - -class Dispatcher: - def __init__(self): - self.modules = {} - self.uris = {} - self.function_names = {} - self.messages, self.option = get_messages() - config = Config(self.option) - +class RegisterDispatcher: def get_function_args(self, function): # remove self first_argument_index = 1 @@ -50,15 +41,21 @@ class Dispatcher: subconfig = config.option(uri) return set(config.option(uri).value.dict().keys()) + def get_function_args(): + function_args = self.get_function_args(function) + # risotto_context is a special argument, remove it + if function_args[0] == 'risotto_context': + function_args = function_args[1:] + return set(function_args) + + # get message arguments + message_args = get_message_args() + # get function arguments + function_args = get_function_args() # compare message arguments with function parameter # it must not have more or less arguments - message_args = get_message_args() - function_args = self.get_function_args(function) - # risotto_context is a special argument, remove it - if function_args[0] == 'risotto_context': - function_args = function_args[1:] - function_args = set(function_args) if message_args != function_args: + # raise if arguments are not equal msg = [] missing_function_args = message_args - function_args if missing_function_args: @@ -70,7 +67,38 @@ class Dispatcher: msg = _(' and ').join(msg) raise RegistrationError(_(f'error with {module_name}.{function_name} arguments: {msg}')) + def _valid_event_params(self, version, uri, function, module_name): + """ parameters function validation for event messages + """ + def get_message_args(): + # load config + config = Config(self.option) + config.property.read_write() + # set message to the uri name + config.option('message').value.set(uri) + # get message argument + subconfig = config.option(uri) + return set(config.option(uri).value.dict().keys()) + def get_function_args(): + function_args = self.get_function_args(function) + # risotto_context is a special argument, remove it + if function_args[0] == 'risotto_context': + function_args = function_args[1:] + return set(function_args) + + # get message arguments + message_args = get_message_args() + # get function arguments + function_args = get_function_args() + # compare message arguments with function parameter + # it can have less arguments but not more + extra_function_args = function_args - message_args + if extra_function_args: + # raise if too many arguments + function_name = function.__name__ + msg = _(f'extra arguments: {extra_function_args}') + raise RegistrationError(_(f'error with {module_name}.{function_name} arguments: {msg}')) def set_function(self, version, uri, function): """ register a function to an URI @@ -89,15 +117,10 @@ class Dispatcher: except AttributeError: raise RegistrationError(_(f'{uri} is not a valid message')) - # check if a function is already register for this uri + # create an uris' version if needed if version not in self.uris: self.uris[version] = {} self.function_names[version] = {} - if uri in self.uris[version]: - raise RegistrationError(_(f'uri {uri} already imported')) - - # valid function's arguements - self._valid_rpc_params(version, uri, function, module_name) # valid function is unique per module if module_name not in self.function_names[version]: @@ -108,15 +131,37 @@ class Dispatcher: self.function_names[version][module_name].append(function_name) # True if first argument is the risotto_context - inject_risotto_context = self.get_function_args(function)[0] == 'risotto_context' + function_args = self.get_function_args(function) + if function_args[0] == 'risotto_context': + inject_risotto_context = True + function_args.pop(0) + else: + inject_risotto_context = False - # register this function - self.uris[version][uri] = {'module': module_name, - 'function': function, - 'risotto_context': inject_risotto_context} + if self.messages[uri]['pattern'] == 'rpc': + # check if a RPC function is already register for this uri + if uri in self.uris[version]: + raise RegistrationError(_(f'uri {uri} already registered')) + # valid function's arguments + self._valid_rpc_params(version, uri, function, module_name) + # register this function + self.uris[version][uri] = {'module': module_name, + 'function': function, + 'risotto_context': inject_risotto_context} + else: + # if event + # valid function's arguments + self._valid_event_params(version, uri, function, module_name) + # register this function + if uri not in self.uris[version]: + self.uris[version][uri] = [] + self.uris[version][uri].append({'module': module_name, + 'function': function, + 'arguments': function_args, + 'risotto_context': inject_risotto_context}) def set_module(self, module_name, module): - """ register a new module + """ register and instanciate a new module """ try: self.modules[module_name] = module.Risotto() @@ -131,33 +176,96 @@ class Dispatcher: if missing_messages: raise RegistrationError(_(f'missing uri {missing_messages}')) + +class Dispatcher(RegisterDispatcher): + """ Manage message (call or publish) + so launch a function when a message is called + """ + def __init__(self): + self.modules = {} + self.uris = {} + self.function_names = {} + self.messages, self.option = get_messages() + config = Config(self.option) + + def new_context(self, + context: Context, + version: str, + uri: str): + new_context = Context() + new_context.paths = copy(context.paths) + new_context.paths.append(version + '.' + uri) + new_context.username = context.username + return new_context + + def check_public_function(self, + version: str + uri: str, + context: Context, + kwargs: Dict, + public_only: bool): + if public_only and not self.messages[uri]['public']: + msg = _(f'the message {version}.{uri} is private') + log.error_msg(version, uri, context, kwargs, 'call', msg) + raise NotAllowedError(msg) + + def check_pattern(self, + version: str + uri: str, + type: str, + context: Context, + kwargs: Dict): + if self.messages[uri]['pattern'] != type: + msg = _(f'{version}.{uri} is not a {type} message') + log.error_msg(version, uri, context, kwargs, 'call', msg) + raise CallError(msg) + + def set_config(self, + uri: str, + kwargs: Dict): + """ create a new Config et set values to it + """ + # create a new config + config = Config(self.option) + config.property.read_write() + # set message option + config.option('message').value.set(uri) + # store values + subconfig = config.option(uri) + for key, value in kwargs.items(): + try: + subconfig.option(key).value.set(value) + except AttributeError: + raise AttributeError(_(f'unknown parameter "{key}"')) + # check mandatories options + config.property.read_only() + mandatories = list(config.value.mandatory()) + if mandatories: + mand = [mand.split('.')[-1] for mand in mandatories] + raise ValueError(_(f'missing parameters: {mand}')) + # return the config + return config + async def call(self, version, uri, risotto_context, public_only=False, **kwargs): """ execute the function associate with specified uri arguments are validate before """ - new_context = Context() - new_context.paths = copy(risotto_context.paths) - new_context.paths.append(version + '.' + uri) - new_context.username = risotto_context.username - if public_only and not self.messages[uri]: - raise NotAllowedError() + new_context = self.new_context(risotto_context, + version, + uri) + self.check_public_function(version, + uri, + new_context, + kwargs, + public_only) + self.check_pattern(version, + uri, + 'rpc', + new_context, + kwargs) try: - config = Config(self.option) - config.property.read_write() - config.option('message').value.set(uri) - subconfig = config.option(uri) - for key, value in kwargs.items(): - try: - subconfig.option(key).value.set(value) - except ValueError as err: - raise ValueError(str(err)) - except AttributeError: - raise AttributeError(_(f'unknown parameter "{key}"')) - config.property.read_only() - mandatories = list(config.value.mandatory()) - if mandatories: - mand = [mand.split('.')[-1] for mand in mandatories] - raise ValueError(_(f'missing parameters: {mand}')) + config = self.set_config(uri, + kwargs) obj = self.uris[version][uri] kw = config.option(uri).value.dict() if obj['risotto_context']: @@ -168,11 +276,58 @@ class Dispatcher: except Exception as err: if DEBUG: print_exc() - log.error_msg(version, uri, new_context, kwargs, err) + log.error_msg(version, uri, new_context, kwargs, 'call', err) raise CallError(str(err)) - log.info_msg(version, uri, new_context, kwargs, returns) + log.info_msg(version, uri, new_context, kwargs, 'call', _(f'returns {returns}')) return returns + async def publish(self, version, uri, risotto_context, public_only=False, **kwargs): + new_context = self.new_context(risotto_context, + version, + uri) + self.check_pattern(version, + uri, + 'event', + new_context, + kwargs) + try: + config = self.set_config(uri, + kwargs) + config_arguments = config.option(uri).value.dict() + except CallError as err: + return + except Exception as err: + # if there is a problem with arguments, just send an error et do nothing + if DEBUG: + print_exc() + log.error_msg(version, uri, new_context, kwargs, 'publish', err) + return + + # config is ok, so publish the message + for function_obj in self.uris[version][uri]: + function = function_obj['function'] + module_name = function.__module__.split('.')[-2] + function_name = function.__name__ + info_msg = _(f'in module {module_name}.{function_name}') + try: + # build argument for this function + kw = {} + for key, value in config_arguments.items(): + if key in function_obj['arguments']: + kw[key] = value + if function_obj['risotto_context']: + kw['risotto_context'] = new_context + # send event + await function(self.modules[function_obj['module']], **kw) + except Exception as err: + if DEBUG: + print_exc() + log.error_msg(version, uri, new_context, kwargs, 'publish', err, info_msg) + else: + module_name = function.__module__.split('.')[-2] + function_name = function.__name__ + log.info_msg(version, uri, new_context, kwargs,'publish', info_msg) + dispatcher = Dispatcher() diff --git a/src/risotto/http.py b/src/risotto/http.py index bd1ca48..b64cad1 100644 --- a/src/risotto/http.py +++ b/src/risotto/http.py @@ -10,18 +10,24 @@ async def handle(request): context = Context() context.username = request.match_info.get('username', "Anonymous") try: - text = await dispatcher.call(version, uri, context, id=2, public_only=True) + text = await dispatcher.call(version, + uri, + context, + # FIXME + id=2, + public_only=True) except NotAllowedError as err: - raise HTTPNotFound(reason=_(f'the message {version}.{uri} is private')) + raise HTTPNotFound(reason=str(err)) except CallError as err: raise HTTPBadRequest(reason=str(err)) except Exception as err: raise HTTPInternalServerError(reason=str(err)) - return Response(text=str(text)) def get_app(): + """ build all routes + """ app = Application() routes = [] uris = list(dispatcher.uris.items()) @@ -31,10 +37,11 @@ def get_app(): print(_('======== Registered messages ========')) for uri in uris: web_uri = f'/{version}/{uri}' - if dispatcher.messages[uri]: + if dispatcher.messages[uri]['public']: print(f' - {web_uri}') else: - print(f' - {web_uri} (private)') + pattern = dispatcher.messages[uri]['pattern'] + print(f' - {web_uri} (private {pattern})') routes.append(get(web_uri, handle)) print() app.add_routes(routes) diff --git a/src/risotto/logger.py b/src/risotto/logger.py index ba3f0ed..445b5c2 100644 --- a/src/risotto/logger.py +++ b/src/risotto/logger.py @@ -1,23 +1,47 @@ +from typing import Dict +from .context import Context from .utils import _ class Logger: - def _get_message_paths(self, risotto_context): + """ An object to manager log + FIXME should add event to a database + """ + def _get_message_paths(self, + risotto_context: Context, + type: str): paths = risotto_context.paths if len(paths) == 1: - paths_msg = f'messages called: {paths[0]}' + paths_msg = f'messages {type}ed: {paths[0]}' else: - paths_msg = f'sub-messages called: ' + paths_msg = f'sub-messages {type}ed: ' paths_msg += ' > '.join(paths) return paths_msg - def error_msg(self, version, message, risotto_context, arguments, error): - paths_msg = self._get_message_paths(risotto_context) - print(_(f'{risotto_context.username}: {error} ({paths_msg} with arguments "{arguments}")')) + def error_msg(self, + version: 'str', + message: 'str', + risotto_context: Context, + arguments, + type: str, + error: str, + msg: str=''): + """ send message when an error append + """ + paths_msg = self._get_message_paths(risotto_context, type) + print(_(f'{risotto_context.username}: ERROR: {error} ({paths_msg} with arguments "{arguments}" {msg})')) - def info_msg(self, version, message, risotto_context, arguments, returns): - paths_msg = self._get_message_paths(risotto_context) - print(_(f'{risotto_context.username}: {paths_msg} with arguments "{arguments}" returns {returns}')) + def info_msg(self, + version: 'str', + message: 'str', + risotto_context: Context, + arguments: Dict, + type: str, + msg: str=''): + """ send message with common information + """ + paths_msg = self._get_message_paths(risotto_context, type) + print(_(f'{risotto_context.username}: INFO: {paths_msg} with arguments "{arguments}" {msg}')) log = Logger() diff --git a/src/risotto/message/message.py b/src/risotto/message/message.py index f166362..208224e 100644 --- a/src/risotto/message/message.py +++ b/src/risotto/message/message.py @@ -584,7 +584,8 @@ def get_messages(load_shortarg=False, only_public=False): if message_def.pattern not in ['rpc', 'event'] or \ (not message_def.public and only_public): continue - optiondescriptions_info[message_def.uri] = message_def.public + optiondescriptions_info[message_def.uri] = {'pattern': message_def.pattern, + 'public': message_def.public} version = message_name.split('.')[0] if message_def.uri in responses: raise Exception('uri {} allready loader'.format(message_def.uri)) diff --git a/src/risotto/services/config/config.py b/src/risotto/services/config/config.py index ce38696..d86950d 100644 --- a/src/risotto/services/config/config.py +++ b/src/risotto/services/config/config.py @@ -3,13 +3,14 @@ from ...dispatcher import register class Risotto(Controller): - # @register('v1.config.created') - async def server_created(self, serverid, servername, servermodelid): - print('pouet') + @register('v1.config.configuration.server.updated') + async def server_created(self, server_id): + print('pouet ' + str(server_id)) @register('v1.config.session.server.start', None) async def get_configuration(self, risotto_context, id): stop = await self.call('v1.config.session.server.stop', risotto_context, sessionid=str(id)) + await self.publish('v1.config.configuration.server.updated', risotto_context, server_id=1, deploy=True) return {'start': id} @register('v1.config.session.server.stop', None)