diff --git a/sql/risotto.sql b/sql/risotto.sql index ad58260..0df932d 100644 --- a/sql/risotto.sql +++ b/sql/risotto.sql @@ -1,8 +1,16 @@ -CREATE TABLE log( +CREATE TABLE RisottoLog( + LogId SERIAL PRIMARY KEY, Msg VARCHAR(255) NOT NULL, + URI VARCHAR(255), + URIS VARCHAR(255), + UserLogin VARCHAR(100) NOT NULL, Level VARCHAR(10) NOT NULL, - Path VARCHAR(255), - Username VARCHAR(100) NOT NULL, + ContextId INTEGER, Data JSON, - Date timestamp DEFAULT current_timestamp + Returns JSON, + StartDate timestamp DEFAULT current_timestamp, + StopDate timestamp ); +CREATE INDEX RisottoLog_ContextId_index ON RisottoLog(ContextId); +CREATE INDEX RisottoLog_Login_index ON RisottoLog(UserLogin); +CREATE INDEX RisottoLog_URI_index ON RisottoLog(URI); diff --git a/src/risotto/config.py b/src/risotto/config.py index b87ba81..451cbeb 100644 --- a/src/risotto/config.py +++ b/src/risotto/config.py @@ -17,6 +17,10 @@ if 'RISOTTO_PORT' in environ: RISOTTO_PORT = environ['RISOTTO_PORT'] else: RISOTTO_PORT = config.get('RISOTTO_PORT', 8080) +if 'RISOTTO_URL' in environ: + RISOTTO_URL = environ['RISOTTO_URL'] +else: + RISOTTO_URL = config.get('RISOTTO_URL', 'http://localhost:8080/') if 'CONFIGURATION_DIR' in environ: CONFIGURATION_DIR = environ['CONFIGURATION_DIR'] else: @@ -135,7 +139,8 @@ _config = {'database': {'dsn': dsn_factory(RISOTTO_DB_NAME, RISOTTO_DB_USER, RIS 'celery_dsn': dsn_factory(CELERYRISOTTO_DB_NAME, CELERYRISOTTO_DB_USER, CELERYRISOTTO_DB_PASSWORD) }, 'http_server': {'port': RISOTTO_PORT, - 'default_user': DEFAULT_USER}, + 'default_user': DEFAULT_USER, + 'url': RISOTTO_URL}, 'global': {'message_root_path': MESSAGE_PATH, 'configurations_dir': CONFIGURATION_DIR, 'debug': True, diff --git a/src/risotto/context.py b/src/risotto/context.py index 835b035..fea3627 100644 --- a/src/risotto/context.py +++ b/src/risotto/context.py @@ -1,3 +1,5 @@ class Context: def __init__(self): self.paths = [] + self.context_id = None + self.start_id = None diff --git a/src/risotto/dispatcher.py b/src/risotto/dispatcher.py index 49b01fa..e0361a0 100644 --- a/src/risotto/dispatcher.py +++ b/src/risotto/dispatcher.py @@ -30,14 +30,11 @@ class CallDispatcher: if response.impl_get_information('multi'): if not isinstance(returns, list): err = _(f'function {module_name}.{function_name} has to return a list') - await log.error_msg(risotto_context, kwargs, err) - raise CallError(str(err)) + raise CallError(err) else: if not isinstance(returns, dict): - await log.error_msg(risotto_context, kwargs, returns) err = _(f'function {module_name}.{function_name} has to return a dict') - await log.error_msg(risotto_context, kwargs, err) - raise CallError(str(err)) + raise CallError(err) returns = [returns] if response is None: raise Exception('hu?') @@ -50,12 +47,10 @@ class CallDispatcher: await config.option(key).value.set(value) except AttributeError: err = _(f'function {module_name}.{function_name} return the unknown parameter "{key}" for the uri "{risotto_context.version}.{risotto_context.message}"') - await log.error_msg(risotto_context, kwargs, err) - raise CallError(str(err)) + raise CallError(err) except ValueError: err = _(f'function {module_name}.{function_name} return the parameter "{key}" with an unvalid value "{value}" for the uri "{risotto_context.version}.{risotto_context.message}"') - await log.error_msg(risotto_context, kwargs, err) - raise CallError(str(err)) + raise CallError(err) await config.property.read_only() mandatories = await config.value.mandatory() if mandatories: @@ -65,8 +60,7 @@ class CallDispatcher: await config.value.dict() except Exception as err: err = _(f'function {module_name}.{function_name} return an invalid response {err} for the uri "{risotto_context.version}.{risotto_context.message}"') - await log.error_msg(risotto_context, kwargs, err) - raise CallError(str(err)) + raise CallError(err) async def call(self, version: str, @@ -89,9 +83,16 @@ class CallDispatcher: if message not in self.messages[version]: raise CallError(_(f'cannot find message "{version}.{message}"')) function_obj = self.messages[version][message] - # do not start a new database connection + # log + function_name = function_obj['function'].__name__ + info_msg = _(f"call function {function_obj['full_module_name']}.{function_name}") if hasattr(old_risotto_context, 'connection'): + # do not start a new database connection risotto_context.connection = old_risotto_context.connection + await log.start(risotto_context, + kwargs, + info_msg, + ) await self.check_message_type(risotto_context, kwargs, ) @@ -101,22 +102,19 @@ class CallDispatcher: check_role, internal, ) - return await self.launch(risotto_context, - kwargs, - config_arguments, - function_obj, - ) + try: + ret = await self.launch(risotto_context, + kwargs, + config_arguments, + function_obj, + ) + except CallError as err: + await log.failed(risotto_context, + str(err), + ) + raise err from err else: try: - await self.check_message_type(risotto_context, - kwargs, - ) - config_arguments = await self.load_kwargs_to_config(risotto_context, - f'{version}.{message}', - kwargs, - check_role, - internal, - ) async with self.pool.acquire() as connection: await connection.set_type_codec( 'json', @@ -126,13 +124,40 @@ class CallDispatcher: ) risotto_context.connection = connection async with connection.transaction(): - return await self.launch(risotto_context, - kwargs, - config_arguments, - function_obj, - ) + try: + await log.start(risotto_context, + kwargs, + info_msg, + ) + await self.check_message_type(risotto_context, + kwargs, + ) + config_arguments = await self.load_kwargs_to_config(risotto_context, + f'{version}.{message}', + kwargs, + check_role, + internal, + ) + ret = await self.launch(risotto_context, + kwargs, + config_arguments, + function_obj, + ) + # log the success + await log.success(risotto_context, + ret, + ) + if not internal and isinstance(ret, dict): + ret['context_id'] = risotto_context.context_id + except CallError as err: + if get_config()['global']['debug']: + print_exc() + await log.failed(risotto_context, + str(err), + ) + raise err from err except CallError as err: - raise err + raise err from err except Exception as err: # if there is a problem with arguments, just send an error and do nothing if get_config()['global']['debug']: @@ -146,8 +171,11 @@ class CallDispatcher: ) risotto_context.connection = connection async with connection.transaction(): - await log.error_msg(risotto_context, kwargs, err) - raise err + await log.failed(risotto_context, + str(err), + ) + raise err from err + return ret class PublishDispatcher: @@ -180,6 +208,7 @@ class PublishDispatcher: remote_kw = dumps({'kwargs': kwargs, 'context': {'username': risotto_context.username, 'paths': risotto_context.paths, + 'context_id': risotto_context.context_id, } }) # FIXME should be better :/ @@ -219,26 +248,41 @@ class PublishDispatcher: False, False, ) - for function_obj in self.messages[version][message]['functions']: - async with self.pool.acquire() as connection: + async with self.pool.acquire() as connection: + await connection.set_type_codec( + 'json', + encoder=dumps, + decoder=loads, + schema='pg_catalog' + ) + risotto_context.connection = connection + for function_obj in self.messages[version][message]['functions']: + function_name = function_obj['function'].__name__ + info_msg = _(f"call function {function_obj['full_module_name']}.{function_name}") try: - await self.check_message_type(risotto_context, - kwargs, - ) - await connection.set_type_codec( - 'json', - encoder=dumps, - decoder=loads, - schema='pg_catalog' - ) - risotto_context.connection = connection async with connection.transaction(): - await self.launch(risotto_context, - kwargs, - config_arguments, - function_obj, - ) - except CallError as err: + try: + await log.start(risotto_context, + kwargs, + info_msg, + ) + await self.check_message_type(risotto_context, + kwargs, + ) + await self.launch(risotto_context, + kwargs, + config_arguments, + function_obj, + ) + # log the success + await log.success(risotto_context) + except CallError as err: + if get_config()['global']['debug']: + print_exc() + await log.failed(risotto_context, + str(err), + ) + except CallError: pass except Exception as err: # if there is a problem with arguments, log and do nothing @@ -253,12 +297,15 @@ class PublishDispatcher: ) risotto_context.connection = connection async with connection.transaction(): - await log.error_msg(risotto_context, kwargs, err) + await log.failed(risotto_context, + str(err), + ) class Dispatcher(register.RegisterDispatcher, CallDispatcher, - PublishDispatcher): + PublishDispatcher, + ): """ Manage message (call or publish) so launch a function when a message is called """ @@ -274,6 +321,7 @@ class Dispatcher(register.RegisterDispatcher, risotto_context = Context() risotto_context.username = context['username'] risotto_context.paths = copy(context['paths']) + risotto_context.context_id = context['context_id'] risotto_context.paths.append(uri) risotto_context.uri = uri risotto_context.type = type @@ -287,7 +335,6 @@ class Dispatcher(register.RegisterDispatcher, ) -> None: if self.messages[risotto_context.version][risotto_context.message]['pattern'] != risotto_context.type: msg = _(f'{risotto_context.uri} is not a {risotto_context.type} message') - await log.error_msg(risotto_context, kwargs, msg) raise CallError(msg) async def load_kwargs_to_config(self, @@ -396,8 +443,6 @@ class Dispatcher(register.RegisterDispatcher, # so send the message function = function_obj['function'] risotto_context.module = function_obj['module'].split('.', 1)[0] - function_name = function.__name__ - info_msg = _(f"in function {function_obj['full_module_name']}.{function_name}") # build argument for this function if risotto_context.type == 'rpc': kw = config_arguments @@ -408,6 +453,7 @@ class Dispatcher(register.RegisterDispatcher, kw[key] = value kw['risotto_context'] = risotto_context + # launch returns = await function(self.get_service(function_obj['module']), **kw) if risotto_context.type == 'rpc': # valid returns @@ -416,12 +462,6 @@ class Dispatcher(register.RegisterDispatcher, returns, kwargs, ) - # log the success - await log.info_msg(risotto_context, - {'arguments': kwargs, - 'returns': returns}, - info_msg, - ) # notification if function_obj.get('notification'): notif_version, notif_message = function_obj['notification'].split('.', 1) diff --git a/src/risotto/logger.py b/src/risotto/logger.py index dcbc94e..62bb058 100644 --- a/src/risotto/logger.py +++ b/src/risotto/logger.py @@ -1,7 +1,7 @@ -from typing import Dict, Any -from json import dumps +from typing import Dict, Any, Optional +from json import dumps, loads from asyncpg.exceptions import UndefinedTableError - +from datetime import datetime from .context import Context from .utils import _ @@ -13,26 +13,79 @@ class Logger: """ async def insert(self, msg: str, - path: str, - risotto_context: str, + uri: str, + uris: str, + risotto_context: Context, level: str, - data: Any= None) -> None: - insert = 'INSERT INTO log(Msg, Path, Username, Level' - values = 'VALUES($1,$2,$3,$4' - args = [msg, path, risotto_context.username, level] + data: Any=None, + start: bool=False, + ) -> None: + insert = 'INSERT INTO RisottoLog(Msg, URI, URIS, UserLogin, Level' + values = 'VALUES($1,$2,$3,$4,$5' + args = [msg, uri, uris, risotto_context.username, level] if data: insert += ', Data' - values += ',$5' + values += ',$6' args.append(dumps(data)) + context_id = risotto_context.context_id + if context_id is not None: + insert += ', ContextId' + if data: + values += ',$7' + else: + values += ',$6' + args.append(context_id) - sql = insert + ') ' + values + ')' + sql = insert + ') ' + values + ') RETURNING LogId' try: - await risotto_context.connection.fetch(sql, *args) + log_id = await risotto_context.connection.fetchval(sql, *args) + if context_id is None and start: + risotto_context.context_id = log_id + if start: + risotto_context.start_id = log_id except UndefinedTableError as err: raise Exception(_(f'cannot access to database ({err}), was the database really created?')) + async def query(self, + risotto_context: Context, + context_id: int, + uri: Optional[str], + ) -> list: + sql = '''SELECT Msg as msg, URI as uri_name, URIS as uris, UserLogin as user_login, Level as level, Data as data, StartDate as start_date, StopDate as stop_date + FROM RisottoLog + WHERE UserLogin = $1 AND (LogId = $2 OR ContextId = $2) + ''' + args = [sql, risotto_context.username, context_id] + if uri is not None: + sql += ' AND URI = $3' + args.append(uri) + ret = [] + for row in await risotto_context.connection.fetch(*args): + d = {} + for key, value in row.items(): + if key == 'data': + if not value: + value = {} + else: + value = loads(value) + elif key in ['start_date', 'stop_date']: + value = str(value) + d[key] = value + ret.append(d) + return ret + + def _get_last_uri(self, + risotto_context: Context, + ) -> str: + if risotto_context.paths: + return risotto_context.paths[-1] + return '' + def _get_message_paths(self, - risotto_context: Context): + risotto_context: Context, + ) -> str: + if not risotto_context.paths: + return '' paths = risotto_context.paths if risotto_context.type: paths_msg = f' {risotto_context.type} ' @@ -49,44 +102,109 @@ class Logger: risotto_context: Context, arguments, error: str, - msg: str=''): + msg: str='', + ): """ send message when an error append """ paths_msg = self._get_message_paths(risotto_context) print(_(f'{risotto_context.username}: ERROR: {error} ({paths_msg} with arguments "{arguments}": {msg})')) await self.insert(msg, + self._get_last_uri(risotto_context), paths_msg, risotto_context, 'Error', - arguments) + arguments, + ) async def info_msg(self, risotto_context: Context, arguments: Dict, - msg: str=''): + msg: str='', + ) -> None: """ send message with common information """ - if risotto_context.paths: - paths_msg = self._get_message_paths(risotto_context) - else: - paths_msg = '' + paths_msg = self._get_message_paths(risotto_context) if get_config()['global']['debug']: print(_(f'{risotto_context.username}: INFO:{paths_msg}: {msg}')) await self.insert(msg, + self._get_last_uri(risotto_context), paths_msg, risotto_context, 'Info', - arguments) + arguments, + ) + + async def start(self, + risotto_context: Context, + arguments: dict, + msg: str, + ) -> None: + paths_msg = self._get_message_paths(risotto_context) + if get_config()['global']['debug']: + print(_(f'{risotto_context.username}: START:{paths_msg}: {msg}')) + await self.insert(msg, + self._get_last_uri(risotto_context), + paths_msg, + risotto_context, + 'Start', + arguments, + start=True, + ) + + async def success(self, + risotto_context: Context, + returns: Optional[dict]=None, + ) -> None: + if get_config()['global']['debug']: + paths_msg = self._get_message_paths(risotto_context) + print(_(f'{risotto_context.username}: SUCCESS:{paths_msg}({risotto_context.context_id})')) + sql = """UPDATE RisottoLog + SET StopDate = $2, + Level = 'SUCCESS' + """ + args = [datetime.now()] + if returns: + sql += """, Returns = $3 + """ + args.append(dumps(returns)) + sql += """WHERE LogId = $1 + """ + await risotto_context.connection.execute(sql, + risotto_context.start_id, + *args, + ) + + async def failed(self, + risotto_context: Context, + err: str, + ) -> None: + if get_config()['global']['debug']: + paths_msg = self._get_message_paths(risotto_context) + print(_(f'{risotto_context.username}: FAILED:{paths_msg}({risotto_context.context_id}): err')) + sql = """UPDATE RisottoLog + SET StopDate = $2, + Level = 'FAILED', + Msg = $3 + WHERE LogId = $1 + """ + await risotto_context.connection.execute(sql, + risotto_context.start_id, + datetime.now(), + err, + ) async def info(self, risotto_context, - msg): + msg, + ): if get_config()['global']['debug']: print(msg) await self.insert(msg, + '', None, risotto_context, - 'Info') + 'Info', + ) log = Logger() diff --git a/src/risotto/register.py b/src/risotto/register.py index b37037b..44cb9bb 100644 --- a/src/risotto/register.py +++ b/src/risotto/register.py @@ -84,11 +84,11 @@ def register(uris: str, def decorator(function): for uri in uris: - version, message = uri.split('.', 1) - dispatcher.set_function(version, - message, + dispatcher.set_function(uri, notification, - function) + function, + function.__module__ + ) return decorator @@ -185,21 +185,20 @@ class RegisterDispatcher: raise RegistrationError(_(f'error with {module_name}.{function_name} arguments: {msg}')) def set_function(self, - version: str, - message: str, + uri: str, notification: str, function: Callable, + full_module_name: str, ): """ register a function to an URI URI is a message """ - + version, message = uri.split('.', 1) # check if message exists if message not in self.messages[version]: raise RegistrationError(_(f'the message {message} not exists')) # xxx submodule can only be register with v1.yyy.xxx..... message - full_module_name = function.__module__ risotto_module_name, submodule_name = full_module_name.split('.')[-3:-1] module_name = risotto_module_name.split('_')[-1] message_module, message_submodule, message_name = message.split('.', 2) @@ -215,7 +214,7 @@ class RegisterDispatcher: # check if already register if 'function' in self.messages[version][message]: - raise RegistrationError(_(f'uri {version}.{message} already registered')) + raise RegistrationError(_(f'uri {uri} already registered')) # register if self.messages[version][message]['pattern'] == 'rpc': @@ -288,9 +287,9 @@ class RegisterDispatcher: for message, message_obj in messages.items(): if not 'functions' in message_obj and not 'function' in message_obj: if message_obj['pattern'] == 'event': - print(f'{message} prêche dans le désert') + print(f'{version}.{message} prêche dans le désert') else: - missing_messages.append(message) + missing_messages.append(f'{version}.{message}') if missing_messages: raise RegistrationError(_(f'no matching function for uri {missing_messages}'))