From 30a267bf4a14c716789e035a0840388fb5f88336 Mon Sep 17 00:00:00 2001 From: Emmanuel Garette Date: Sat, 24 Apr 2021 10:10:54 +0200 Subject: [PATCH 1/3] add TiramisuController --- src/risotto/controller.py | 256 +++++++++++++++++++++++++++++++++++++- src/risotto/utils.py | 20 ++- 2 files changed, 273 insertions(+), 3 deletions(-) diff --git a/src/risotto/controller.py b/src/risotto/controller.py index 0c4caa2..bb4313b 100644 --- a/src/risotto/controller.py +++ b/src/risotto/controller.py @@ -1,6 +1,22 @@ +from os import listdir, makedirs +from os.path import join, isdir, isfile +from shutil import rmtree +from traceback import print_exc +from typing import Dict +from rougail import RougailConvert, RougailConfig, RougailUpgrade +try: + from tiramisu3 import Storage, Config +except: + from tiramisu import Storage, Config + +from .config import get_config +from .utils import _, tiramisu_display_name +from .logger import log from .dispatcher import dispatcher from .context import Context -from .utils import _ + + +RougailConfig['variable_namespace'] = 'configuration' class Controller: @@ -8,7 +24,7 @@ class Controller: """ def __init__(self, test: bool, - ): + ) -> None: pass async def call(self, @@ -77,3 +93,239 @@ class Controller: risotto_context, ): pass + + +class TiramisuController(Controller): + def __init__(self, + test: bool, + ) -> None: + if not 'dataset_name' in vars(self): + raise Exception(f'please specify "dataset_name" to "{self.__class__.__name__}"') + self.tiramisu_cache_root_path = join(get_config()['cache']['root_path'], self.dataset_name) + if not test: + db_conf = get_config()['database']['tiramisu_dsn'] + self.save_storage = Storage(engine='postgres') + self.save_storage.setting(dsn=db_conf) + if self.dataset_name != 'servermodel': + self.optiondescription = None + dispatcher.set_function('v1.setting.dataset.updated', + None, + TiramisuController.dataset_updated, + self.__class__.__module__, + ) + + async def on_join(self, + risotto_context: Context, + ) -> None: + if isdir(self.tiramisu_cache_root_path): + await self.load_datas(risotto_context) + + async def dataset_updated(self, + risotto_context: Context, + ) -> Dict: + await self.gen_dictionaries(risotto_context) + await self.load_datas(risotto_context) + + async def gen_dictionaries(self, + risotto_context: Context, + ) -> None: + sources = await self.get_sources(risotto_context) + self._aggregate_tiramisu_funcs(sources) + self._convert_dictionaries_to_tiramisu(sources) + + async def get_sources(self, + risotto_context: Context, + ) -> None: + return await self.call('v1.setting.source.list', + risotto_context, + ) + + def _aggregate_tiramisu_funcs(self, + sources: list, + ) -> None: + dest_file = join(self.tiramisu_cache_root_path, 'funcs.py') + if not isdir(self.tiramisu_cache_root_path): + makedirs(self.tiramisu_cache_root_path) + with open(dest_file, 'wb') as funcs: + funcs.write(b"""try: + from tiramisu3 import valid_network_netmask, valid_ip_netmask, valid_broadcast, valid_in_network, valid_not_equal as valid_differ, valid_not_equal, calc_value +except: + from tiramisu import valid_network_netmask, valid_ip_netmask, valid_broadcast, valid_in_network, valid_not_equal as valid_differ, valid_not_equal, calc_value + +""") + for source in sources: + root_path = join(source['source_directory'], + self.dataset_name, + ) + if not isdir(root_path): + continue + for service in listdir(root_path): + path = join(root_path, + service, + 'funcs', + ) + if not isdir(path): + continue + for filename in listdir(path): + if not filename.endswith('.py'): + continue + filename_path = join(path, filename) + with open(filename_path, 'rb') as fh: + funcs.write(f'# {filename_path}\n'.encode()) + funcs.write(fh.read()) + funcs.write(b'\n') + + def _convert_dictionaries_to_tiramisu(self, sources: list) -> None: + funcs_file = join(self.tiramisu_cache_root_path, 'funcs.py') + tiramisu_file = join(self.tiramisu_cache_root_path, 'tiramisu.py') + dictionaries_dir = join(self.tiramisu_cache_root_path, 'dictionaries') + extras_dictionaries_dir = join(self.tiramisu_cache_root_path, 'extra_dictionaries') + if isdir(dictionaries_dir): + rmtree(dictionaries_dir) + makedirs(dictionaries_dir) + if isdir(extras_dictionaries_dir): + rmtree(extras_dictionaries_dir) + makedirs(extras_dictionaries_dir) + extras = [] + upgrade = RougailUpgrade() + for source in sources: + root_path = join(source['source_directory'], + self.dataset_name, + ) + if not isdir(root_path): + continue + for service in listdir(root_path): + # upgrade dictionaries + path = join(root_path, + service, + 'dictionaries', + ) + if not isdir(path): + continue + upgrade.load_xml_from_folders(path, + dictionaries_dir, + RougailConfig['variable_namespace'], + ) + for service in listdir(root_path): + # upgrade extra dictionaries + path = join(root_path, + service, + 'extras', + ) + if not isdir(path): + continue + for namespace in listdir(path): + extra_dir = join(path, + namespace, + ) + if not isdir(extra_dir): + continue + extra_dictionaries_dir = join(extras_dictionaries_dir, + namespace, + ) + if not isdir(extra_dictionaries_dir): + makedirs(extra_dictionaries_dir) + extras.append((namespace, [extra_dictionaries_dir])) + upgrade.load_xml_from_folders(extra_dir, + extra_dictionaries_dir, + namespace, + ) + del upgrade + config = RougailConfig.copy() + config['functions_file'] = funcs_file + config['dictionaries_dir'] = [dictionaries_dir] + config['extra_dictionaries'] = {} + for extra in extras: + config['extra_dictionaries'][extra[0]] = extra[1] + eolobj = RougailConvert(rougailconfig=config) + eolobj.save(tiramisu_file) + + async def load(self, + risotto_context: Context, + name: str, + to_deploy: bool=False, + ) -> Config: + if self.optiondescription is None: + # use file in cache + tiramisu_file = join(self.tiramisu_cache_root_path, 'tiramisu.py') + if not isfile(tiramisu_file): + raise Exception(_(f'unable to load the "{self.dataset_name}" configuration, is dataset loaded?')) + with open(tiramisu_file) as fileio: + tiramisu_locals = {} + try: + exec(fileio.read(), None, tiramisu_locals) + except Exception as err: + raise Exception(_(f'unable to load tiramisu file {tiramisu_file}: {err}')) + + self.optiondescription = tiramisu_locals['option_0'] + del tiramisu_locals + try: + letter = self.dataset_name[0] + if not to_deploy: + session_id = f'{letter}_{name}' + else: + session_id = f'{letter}td_{name}' + config = await Config(self.optiondescription, + session_id=session_id, + storage=self.save_storage, + display_name=tiramisu_display_name, + ) + # change default rights + await config.property.read_only() + await config.permissive.add('basic') + await config.permissive.add('normal') + await config.permissive.add('expert') + + # set information and owner + await config.owner.set(session_id) + await config.information.set(f'{self.dataset_name}_name', name) + except Exception as err: + if get_config()['global']['debug']: + print_exc() + msg = _(f'unable to load config for {self.dataset_name} "{name}": {err}') + await log.error_msg(risotto_context, + None, + msg, + ) + return config + + async def _deploy_configuration(self, + dico: dict, + ) -> None: + config_std = dico['config_to_deploy'] + config = dico['config'] + # when deploy, calculate force_store_value + ro = await config_std.property.getdefault('read_only', 'append') + if 'force_store_value' not in ro: + await config_std.property.read_write() + if self.dataset_name == 'servermodel': + # server_deployed should be hidden + await config_std.forcepermissive.option('configuration.general.server_deployed').value.set(True) + ro = frozenset(list(ro) + ['force_store_value']) + rw = await config_std.property.getdefault('read_write', 'append') + rw = frozenset(list(rw) + ['force_store_value']) + await config_std.property.setdefault(ro, 'read_only', 'append') + await config_std.property.setdefault(rw, 'read_write', 'append') + await config_std.property.read_only() + + # copy informations from 'to deploy' configuration to configuration + await config.value.importation(await config_std.value.exportation()) + await config.permissive.importation(await config_std.permissive.exportation()) + await config.property.importation(await config_std.property.exportation()) + + async def build_configuration(self, + config: Config, + ) -> dict: + configuration = {} + for option in await config.option.list('optiondescription'): + name = await option.option.name() + if name == 'services': + continue + if name == RougailConfig['variable_namespace']: + fullpath = False + flatten = True + else: + fullpath = True + flatten = False + configuration.update(await option.value.dict(leader_to_list=True, fullpath=fullpath, flatten=flatten)) + return configuration diff --git a/src/risotto/utils.py b/src/risotto/utils.py index 7282708..4a497ea 100644 --- a/src/risotto/utils.py +++ b/src/risotto/utils.py @@ -1,9 +1,27 @@ class Undefined: pass +undefined = Undefined() def _(s): return s -undefined = Undefined() +def tiramisu_display_name(kls, + dyn_name: 'Base'=None, + suffix: str=None, + ) -> str: + if dyn_name is not None: + name = dyn_name + else: + name = kls.impl_getname() + doc = kls.impl_get_information('doc', None) + if doc: + doc = str(doc) + if doc.endswith('.'): + doc = doc[:-1] + if suffix: + doc += suffix + if name != doc: + name += f' ({doc})' + return name From 19240489dbae6b0cbcbec197c1a1dafc5643bd0b Mon Sep 17 00:00:00 2001 From: Emmanuel Garette Date: Sat, 24 Apr 2021 10:12:32 +0200 Subject: [PATCH 2/3] add http static support --- src/risotto/http.py | 30 ++++++++++++++++++++++++------ 1 file changed, 24 insertions(+), 6 deletions(-) diff --git a/src/risotto/http.py b/src/risotto/http.py index f501397..b5779b3 100644 --- a/src/risotto/http.py +++ b/src/risotto/http.py @@ -1,4 +1,4 @@ -from aiohttp.web import Application, Response, get, post, HTTPBadRequest, HTTPInternalServerError, HTTPNotFound +from aiohttp.web import Application, Response, get, post, HTTPBadRequest, HTTPInternalServerError, HTTPNotFound, static from json import dumps from traceback import print_exc try: @@ -12,12 +12,13 @@ from .utils import _ from .context import Context from .error import CallError, NotAllowedError, RegistrationError from .message import get_messages -from .logger import log +#from .logger import log from .config import get_config from . import services extra_routes = {} +extra_statics = {} def create_context(request): @@ -35,12 +36,21 @@ def register(version: str, """ def decorator(function): if path in extra_routes: - raise RegistrationError(f'the route {path} is already registered') + raise RegistrationError(f'the route "{path}" is already registered') extra_routes[path] = {'function': function, - 'version': version} + 'version': version, + } return decorator +def register_static(path: str, + directory: str, + ) -> None: + if path in extra_statics: + raise RegistrationError(f'the static path "{path}" is already registered') + extra_statics[path] = directory + + class extra_route_handler: async def __new__(cls, request, @@ -70,7 +80,8 @@ class extra_route_handler: # await log.info_msg(kwargs['risotto_context'], # dict(request.match_info)) return Response(text=dumps(returns), - content_type='application/json') + content_type='application/json', + ) async def handle(request): @@ -135,7 +146,7 @@ async def api(request, async def get_app(loop): """ build all routes """ - global extra_routes + global extra_routes, extra_statics services.link_to_dispatcher(dispatcher) app = Application(loop=loop) routes = [] @@ -175,7 +186,14 @@ async def get_app(loop): extra_handler = type(path, (extra_route_handler,), extra) routes.append(get(path, extra_handler)) print(f' - {path} (http_get)') + if extra_statics: + if not extra_routes: + print(_('======== Registered static routes ========')) + for path, directory in extra_statics.items(): + routes.append(static(path, directory)) + print(f' - {path} (static)') del extra_routes + del extra_statics app.router.add_routes(routes) await dispatcher.register_remote() print() From 4c83e6d89d77b86f8f10cc4e5b070e037b2b754b Mon Sep 17 00:00:00 2001 From: Emmanuel Garette Date: Sat, 24 Apr 2021 10:12:39 +0200 Subject: [PATCH 3/3] better log support --- sql/risotto.sql | 16 +++- src/risotto/config.py | 7 +- src/risotto/context.py | 2 + src/risotto/dispatcher.py | 166 +++++++++++++++++++++++--------------- src/risotto/logger.py | 164 +++++++++++++++++++++++++++++++------ src/risotto/register.py | 21 +++-- 6 files changed, 274 insertions(+), 102 deletions(-) diff --git a/sql/risotto.sql b/sql/risotto.sql index ad58260..0df932d 100644 --- a/sql/risotto.sql +++ b/sql/risotto.sql @@ -1,8 +1,16 @@ -CREATE TABLE log( +CREATE TABLE RisottoLog( + LogId SERIAL PRIMARY KEY, Msg VARCHAR(255) NOT NULL, + URI VARCHAR(255), + URIS VARCHAR(255), + UserLogin VARCHAR(100) NOT NULL, Level VARCHAR(10) NOT NULL, - Path VARCHAR(255), - Username VARCHAR(100) NOT NULL, + ContextId INTEGER, Data JSON, - Date timestamp DEFAULT current_timestamp + Returns JSON, + StartDate timestamp DEFAULT current_timestamp, + StopDate timestamp ); +CREATE INDEX RisottoLog_ContextId_index ON RisottoLog(ContextId); +CREATE INDEX RisottoLog_Login_index ON RisottoLog(UserLogin); +CREATE INDEX RisottoLog_URI_index ON RisottoLog(URI); diff --git a/src/risotto/config.py b/src/risotto/config.py index b87ba81..451cbeb 100644 --- a/src/risotto/config.py +++ b/src/risotto/config.py @@ -17,6 +17,10 @@ if 'RISOTTO_PORT' in environ: RISOTTO_PORT = environ['RISOTTO_PORT'] else: RISOTTO_PORT = config.get('RISOTTO_PORT', 8080) +if 'RISOTTO_URL' in environ: + RISOTTO_URL = environ['RISOTTO_URL'] +else: + RISOTTO_URL = config.get('RISOTTO_URL', 'http://localhost:8080/') if 'CONFIGURATION_DIR' in environ: CONFIGURATION_DIR = environ['CONFIGURATION_DIR'] else: @@ -135,7 +139,8 @@ _config = {'database': {'dsn': dsn_factory(RISOTTO_DB_NAME, RISOTTO_DB_USER, RIS 'celery_dsn': dsn_factory(CELERYRISOTTO_DB_NAME, CELERYRISOTTO_DB_USER, CELERYRISOTTO_DB_PASSWORD) }, 'http_server': {'port': RISOTTO_PORT, - 'default_user': DEFAULT_USER}, + 'default_user': DEFAULT_USER, + 'url': RISOTTO_URL}, 'global': {'message_root_path': MESSAGE_PATH, 'configurations_dir': CONFIGURATION_DIR, 'debug': True, diff --git a/src/risotto/context.py b/src/risotto/context.py index 835b035..fea3627 100644 --- a/src/risotto/context.py +++ b/src/risotto/context.py @@ -1,3 +1,5 @@ class Context: def __init__(self): self.paths = [] + self.context_id = None + self.start_id = None diff --git a/src/risotto/dispatcher.py b/src/risotto/dispatcher.py index 49b01fa..e0361a0 100644 --- a/src/risotto/dispatcher.py +++ b/src/risotto/dispatcher.py @@ -30,14 +30,11 @@ class CallDispatcher: if response.impl_get_information('multi'): if not isinstance(returns, list): err = _(f'function {module_name}.{function_name} has to return a list') - await log.error_msg(risotto_context, kwargs, err) - raise CallError(str(err)) + raise CallError(err) else: if not isinstance(returns, dict): - await log.error_msg(risotto_context, kwargs, returns) err = _(f'function {module_name}.{function_name} has to return a dict') - await log.error_msg(risotto_context, kwargs, err) - raise CallError(str(err)) + raise CallError(err) returns = [returns] if response is None: raise Exception('hu?') @@ -50,12 +47,10 @@ class CallDispatcher: await config.option(key).value.set(value) except AttributeError: err = _(f'function {module_name}.{function_name} return the unknown parameter "{key}" for the uri "{risotto_context.version}.{risotto_context.message}"') - await log.error_msg(risotto_context, kwargs, err) - raise CallError(str(err)) + raise CallError(err) except ValueError: err = _(f'function {module_name}.{function_name} return the parameter "{key}" with an unvalid value "{value}" for the uri "{risotto_context.version}.{risotto_context.message}"') - await log.error_msg(risotto_context, kwargs, err) - raise CallError(str(err)) + raise CallError(err) await config.property.read_only() mandatories = await config.value.mandatory() if mandatories: @@ -65,8 +60,7 @@ class CallDispatcher: await config.value.dict() except Exception as err: err = _(f'function {module_name}.{function_name} return an invalid response {err} for the uri "{risotto_context.version}.{risotto_context.message}"') - await log.error_msg(risotto_context, kwargs, err) - raise CallError(str(err)) + raise CallError(err) async def call(self, version: str, @@ -89,9 +83,16 @@ class CallDispatcher: if message not in self.messages[version]: raise CallError(_(f'cannot find message "{version}.{message}"')) function_obj = self.messages[version][message] - # do not start a new database connection + # log + function_name = function_obj['function'].__name__ + info_msg = _(f"call function {function_obj['full_module_name']}.{function_name}") if hasattr(old_risotto_context, 'connection'): + # do not start a new database connection risotto_context.connection = old_risotto_context.connection + await log.start(risotto_context, + kwargs, + info_msg, + ) await self.check_message_type(risotto_context, kwargs, ) @@ -101,22 +102,19 @@ class CallDispatcher: check_role, internal, ) - return await self.launch(risotto_context, - kwargs, - config_arguments, - function_obj, - ) + try: + ret = await self.launch(risotto_context, + kwargs, + config_arguments, + function_obj, + ) + except CallError as err: + await log.failed(risotto_context, + str(err), + ) + raise err from err else: try: - await self.check_message_type(risotto_context, - kwargs, - ) - config_arguments = await self.load_kwargs_to_config(risotto_context, - f'{version}.{message}', - kwargs, - check_role, - internal, - ) async with self.pool.acquire() as connection: await connection.set_type_codec( 'json', @@ -126,13 +124,40 @@ class CallDispatcher: ) risotto_context.connection = connection async with connection.transaction(): - return await self.launch(risotto_context, - kwargs, - config_arguments, - function_obj, - ) + try: + await log.start(risotto_context, + kwargs, + info_msg, + ) + await self.check_message_type(risotto_context, + kwargs, + ) + config_arguments = await self.load_kwargs_to_config(risotto_context, + f'{version}.{message}', + kwargs, + check_role, + internal, + ) + ret = await self.launch(risotto_context, + kwargs, + config_arguments, + function_obj, + ) + # log the success + await log.success(risotto_context, + ret, + ) + if not internal and isinstance(ret, dict): + ret['context_id'] = risotto_context.context_id + except CallError as err: + if get_config()['global']['debug']: + print_exc() + await log.failed(risotto_context, + str(err), + ) + raise err from err except CallError as err: - raise err + raise err from err except Exception as err: # if there is a problem with arguments, just send an error and do nothing if get_config()['global']['debug']: @@ -146,8 +171,11 @@ class CallDispatcher: ) risotto_context.connection = connection async with connection.transaction(): - await log.error_msg(risotto_context, kwargs, err) - raise err + await log.failed(risotto_context, + str(err), + ) + raise err from err + return ret class PublishDispatcher: @@ -180,6 +208,7 @@ class PublishDispatcher: remote_kw = dumps({'kwargs': kwargs, 'context': {'username': risotto_context.username, 'paths': risotto_context.paths, + 'context_id': risotto_context.context_id, } }) # FIXME should be better :/ @@ -219,26 +248,41 @@ class PublishDispatcher: False, False, ) - for function_obj in self.messages[version][message]['functions']: - async with self.pool.acquire() as connection: + async with self.pool.acquire() as connection: + await connection.set_type_codec( + 'json', + encoder=dumps, + decoder=loads, + schema='pg_catalog' + ) + risotto_context.connection = connection + for function_obj in self.messages[version][message]['functions']: + function_name = function_obj['function'].__name__ + info_msg = _(f"call function {function_obj['full_module_name']}.{function_name}") try: - await self.check_message_type(risotto_context, - kwargs, - ) - await connection.set_type_codec( - 'json', - encoder=dumps, - decoder=loads, - schema='pg_catalog' - ) - risotto_context.connection = connection async with connection.transaction(): - await self.launch(risotto_context, - kwargs, - config_arguments, - function_obj, - ) - except CallError as err: + try: + await log.start(risotto_context, + kwargs, + info_msg, + ) + await self.check_message_type(risotto_context, + kwargs, + ) + await self.launch(risotto_context, + kwargs, + config_arguments, + function_obj, + ) + # log the success + await log.success(risotto_context) + except CallError as err: + if get_config()['global']['debug']: + print_exc() + await log.failed(risotto_context, + str(err), + ) + except CallError: pass except Exception as err: # if there is a problem with arguments, log and do nothing @@ -253,12 +297,15 @@ class PublishDispatcher: ) risotto_context.connection = connection async with connection.transaction(): - await log.error_msg(risotto_context, kwargs, err) + await log.failed(risotto_context, + str(err), + ) class Dispatcher(register.RegisterDispatcher, CallDispatcher, - PublishDispatcher): + PublishDispatcher, + ): """ Manage message (call or publish) so launch a function when a message is called """ @@ -274,6 +321,7 @@ class Dispatcher(register.RegisterDispatcher, risotto_context = Context() risotto_context.username = context['username'] risotto_context.paths = copy(context['paths']) + risotto_context.context_id = context['context_id'] risotto_context.paths.append(uri) risotto_context.uri = uri risotto_context.type = type @@ -287,7 +335,6 @@ class Dispatcher(register.RegisterDispatcher, ) -> None: if self.messages[risotto_context.version][risotto_context.message]['pattern'] != risotto_context.type: msg = _(f'{risotto_context.uri} is not a {risotto_context.type} message') - await log.error_msg(risotto_context, kwargs, msg) raise CallError(msg) async def load_kwargs_to_config(self, @@ -396,8 +443,6 @@ class Dispatcher(register.RegisterDispatcher, # so send the message function = function_obj['function'] risotto_context.module = function_obj['module'].split('.', 1)[0] - function_name = function.__name__ - info_msg = _(f"in function {function_obj['full_module_name']}.{function_name}") # build argument for this function if risotto_context.type == 'rpc': kw = config_arguments @@ -408,6 +453,7 @@ class Dispatcher(register.RegisterDispatcher, kw[key] = value kw['risotto_context'] = risotto_context + # launch returns = await function(self.get_service(function_obj['module']), **kw) if risotto_context.type == 'rpc': # valid returns @@ -416,12 +462,6 @@ class Dispatcher(register.RegisterDispatcher, returns, kwargs, ) - # log the success - await log.info_msg(risotto_context, - {'arguments': kwargs, - 'returns': returns}, - info_msg, - ) # notification if function_obj.get('notification'): notif_version, notif_message = function_obj['notification'].split('.', 1) diff --git a/src/risotto/logger.py b/src/risotto/logger.py index dcbc94e..62bb058 100644 --- a/src/risotto/logger.py +++ b/src/risotto/logger.py @@ -1,7 +1,7 @@ -from typing import Dict, Any -from json import dumps +from typing import Dict, Any, Optional +from json import dumps, loads from asyncpg.exceptions import UndefinedTableError - +from datetime import datetime from .context import Context from .utils import _ @@ -13,26 +13,79 @@ class Logger: """ async def insert(self, msg: str, - path: str, - risotto_context: str, + uri: str, + uris: str, + risotto_context: Context, level: str, - data: Any= None) -> None: - insert = 'INSERT INTO log(Msg, Path, Username, Level' - values = 'VALUES($1,$2,$3,$4' - args = [msg, path, risotto_context.username, level] + data: Any=None, + start: bool=False, + ) -> None: + insert = 'INSERT INTO RisottoLog(Msg, URI, URIS, UserLogin, Level' + values = 'VALUES($1,$2,$3,$4,$5' + args = [msg, uri, uris, risotto_context.username, level] if data: insert += ', Data' - values += ',$5' + values += ',$6' args.append(dumps(data)) + context_id = risotto_context.context_id + if context_id is not None: + insert += ', ContextId' + if data: + values += ',$7' + else: + values += ',$6' + args.append(context_id) - sql = insert + ') ' + values + ')' + sql = insert + ') ' + values + ') RETURNING LogId' try: - await risotto_context.connection.fetch(sql, *args) + log_id = await risotto_context.connection.fetchval(sql, *args) + if context_id is None and start: + risotto_context.context_id = log_id + if start: + risotto_context.start_id = log_id except UndefinedTableError as err: raise Exception(_(f'cannot access to database ({err}), was the database really created?')) + async def query(self, + risotto_context: Context, + context_id: int, + uri: Optional[str], + ) -> list: + sql = '''SELECT Msg as msg, URI as uri_name, URIS as uris, UserLogin as user_login, Level as level, Data as data, StartDate as start_date, StopDate as stop_date + FROM RisottoLog + WHERE UserLogin = $1 AND (LogId = $2 OR ContextId = $2) + ''' + args = [sql, risotto_context.username, context_id] + if uri is not None: + sql += ' AND URI = $3' + args.append(uri) + ret = [] + for row in await risotto_context.connection.fetch(*args): + d = {} + for key, value in row.items(): + if key == 'data': + if not value: + value = {} + else: + value = loads(value) + elif key in ['start_date', 'stop_date']: + value = str(value) + d[key] = value + ret.append(d) + return ret + + def _get_last_uri(self, + risotto_context: Context, + ) -> str: + if risotto_context.paths: + return risotto_context.paths[-1] + return '' + def _get_message_paths(self, - risotto_context: Context): + risotto_context: Context, + ) -> str: + if not risotto_context.paths: + return '' paths = risotto_context.paths if risotto_context.type: paths_msg = f' {risotto_context.type} ' @@ -49,44 +102,109 @@ class Logger: risotto_context: Context, arguments, error: str, - msg: str=''): + msg: str='', + ): """ send message when an error append """ paths_msg = self._get_message_paths(risotto_context) print(_(f'{risotto_context.username}: ERROR: {error} ({paths_msg} with arguments "{arguments}": {msg})')) await self.insert(msg, + self._get_last_uri(risotto_context), paths_msg, risotto_context, 'Error', - arguments) + arguments, + ) async def info_msg(self, risotto_context: Context, arguments: Dict, - msg: str=''): + msg: str='', + ) -> None: """ send message with common information """ - if risotto_context.paths: - paths_msg = self._get_message_paths(risotto_context) - else: - paths_msg = '' + paths_msg = self._get_message_paths(risotto_context) if get_config()['global']['debug']: print(_(f'{risotto_context.username}: INFO:{paths_msg}: {msg}')) await self.insert(msg, + self._get_last_uri(risotto_context), paths_msg, risotto_context, 'Info', - arguments) + arguments, + ) + + async def start(self, + risotto_context: Context, + arguments: dict, + msg: str, + ) -> None: + paths_msg = self._get_message_paths(risotto_context) + if get_config()['global']['debug']: + print(_(f'{risotto_context.username}: START:{paths_msg}: {msg}')) + await self.insert(msg, + self._get_last_uri(risotto_context), + paths_msg, + risotto_context, + 'Start', + arguments, + start=True, + ) + + async def success(self, + risotto_context: Context, + returns: Optional[dict]=None, + ) -> None: + if get_config()['global']['debug']: + paths_msg = self._get_message_paths(risotto_context) + print(_(f'{risotto_context.username}: SUCCESS:{paths_msg}({risotto_context.context_id})')) + sql = """UPDATE RisottoLog + SET StopDate = $2, + Level = 'SUCCESS' + """ + args = [datetime.now()] + if returns: + sql += """, Returns = $3 + """ + args.append(dumps(returns)) + sql += """WHERE LogId = $1 + """ + await risotto_context.connection.execute(sql, + risotto_context.start_id, + *args, + ) + + async def failed(self, + risotto_context: Context, + err: str, + ) -> None: + if get_config()['global']['debug']: + paths_msg = self._get_message_paths(risotto_context) + print(_(f'{risotto_context.username}: FAILED:{paths_msg}({risotto_context.context_id}): err')) + sql = """UPDATE RisottoLog + SET StopDate = $2, + Level = 'FAILED', + Msg = $3 + WHERE LogId = $1 + """ + await risotto_context.connection.execute(sql, + risotto_context.start_id, + datetime.now(), + err, + ) async def info(self, risotto_context, - msg): + msg, + ): if get_config()['global']['debug']: print(msg) await self.insert(msg, + '', None, risotto_context, - 'Info') + 'Info', + ) log = Logger() diff --git a/src/risotto/register.py b/src/risotto/register.py index b37037b..44cb9bb 100644 --- a/src/risotto/register.py +++ b/src/risotto/register.py @@ -84,11 +84,11 @@ def register(uris: str, def decorator(function): for uri in uris: - version, message = uri.split('.', 1) - dispatcher.set_function(version, - message, + dispatcher.set_function(uri, notification, - function) + function, + function.__module__ + ) return decorator @@ -185,21 +185,20 @@ class RegisterDispatcher: raise RegistrationError(_(f'error with {module_name}.{function_name} arguments: {msg}')) def set_function(self, - version: str, - message: str, + uri: str, notification: str, function: Callable, + full_module_name: str, ): """ register a function to an URI URI is a message """ - + version, message = uri.split('.', 1) # check if message exists if message not in self.messages[version]: raise RegistrationError(_(f'the message {message} not exists')) # xxx submodule can only be register with v1.yyy.xxx..... message - full_module_name = function.__module__ risotto_module_name, submodule_name = full_module_name.split('.')[-3:-1] module_name = risotto_module_name.split('_')[-1] message_module, message_submodule, message_name = message.split('.', 2) @@ -215,7 +214,7 @@ class RegisterDispatcher: # check if already register if 'function' in self.messages[version][message]: - raise RegistrationError(_(f'uri {version}.{message} already registered')) + raise RegistrationError(_(f'uri {uri} already registered')) # register if self.messages[version][message]['pattern'] == 'rpc': @@ -288,9 +287,9 @@ class RegisterDispatcher: for message, message_obj in messages.items(): if not 'functions' in message_obj and not 'function' in message_obj: if message_obj['pattern'] == 'event': - print(f'{message} prêche dans le désert') + print(f'{version}.{message} prêche dans le désert') else: - missing_messages.append(message) + missing_messages.append(f'{version}.{message}') if missing_messages: raise RegistrationError(_(f'no matching function for uri {missing_messages}'))