From ed51bc483d99284d3957953187f6d1ea3490f7d5 Mon Sep 17 00:00:00 2001 From: Emmanuel Garette Date: Sat, 24 Apr 2021 17:11:06 +0200 Subject: [PATCH] corrections in log --- sql/risotto.sql | 4 +- src/risotto/dispatcher.py | 208 +++++++++++++++++++------------------- src/risotto/logger.py | 80 +++++++-------- src/risotto/register.py | 59 ++++++----- 4 files changed, 173 insertions(+), 178 deletions(-) diff --git a/sql/risotto.sql b/sql/risotto.sql index 0df932d..50812c2 100644 --- a/sql/risotto.sql +++ b/sql/risotto.sql @@ -1,12 +1,12 @@ CREATE TABLE RisottoLog( LogId SERIAL PRIMARY KEY, + ContextId INTEGER, Msg VARCHAR(255) NOT NULL, URI VARCHAR(255), URIS VARCHAR(255), UserLogin VARCHAR(100) NOT NULL, Level VARCHAR(10) NOT NULL, - ContextId INTEGER, - Data JSON, + Kwargs JSON, Returns JSON, StartDate timestamp DEFAULT current_timestamp, StopDate timestamp diff --git a/src/risotto/dispatcher.py b/src/risotto/dispatcher.py index b6969b8..cd56122 100644 --- a/src/risotto/dispatcher.py +++ b/src/risotto/dispatcher.py @@ -126,60 +126,59 @@ class CallDispatcher: decoder=loads, schema='pg_catalog' ) - async with log_connection.transaction(): - try: - risotto_context.log_connection = log_connection - async with self.pool.acquire() as connection: - await connection.set_type_codec( - 'json', - encoder=dumps, - decoder=loads, - schema='pg_catalog' - ) - risotto_context.connection = connection - async with connection.transaction(): - try: - await log.start(risotto_context, - kwargs, - info_msg, - ) - await self.check_message_type(risotto_context, - kwargs, - ) - config_arguments = await self.load_kwargs_to_config(risotto_context, - f'{version}.{message}', - kwargs, - check_role, - internal, - ) - ret = await self.launch(risotto_context, - kwargs, - config_arguments, - function_obj, - ) - # log the success - await log.success(risotto_context, - ret, - ) - if not internal and isinstance(ret, dict): - ret['context_id'] = risotto_context.context_id - except CallError as err: - if get_config()['global']['debug']: - print_exc() - await log.failed(risotto_context, - str(err), - ) - raise err from err - except CallError as err: - error = err - except Exception as err: - # if there is a problem with arguments, just send an error and do nothing - if get_config()['global']['debug']: - print_exc() - await log.failed(risotto_context, - str(err), - ) - error = err + risotto_context.log_connection = log_connection + try: + async with self.pool.acquire() as connection: + await connection.set_type_codec( + 'json', + encoder=dumps, + decoder=loads, + schema='pg_catalog' + ) + risotto_context.connection = connection + async with connection.transaction(): + try: + await log.start(risotto_context, + kwargs, + info_msg, + ) + await self.check_message_type(risotto_context, + kwargs, + ) + config_arguments = await self.load_kwargs_to_config(risotto_context, + f'{version}.{message}', + kwargs, + check_role, + internal, + ) + ret = await self.launch(risotto_context, + kwargs, + config_arguments, + function_obj, + ) + # log the success + await log.success(risotto_context, + ret, + ) + if not internal and isinstance(ret, dict): + ret['context_id'] = risotto_context.context_id + except CallError as err: + if get_config()['global']['debug']: + print_exc() + await log.failed(risotto_context, + str(err), + ) + raise err from err + except CallError as err: + error = err + except Exception as err: + # if there is a problem with arguments, just send an error and do nothing + if get_config()['global']['debug']: + print_exc() + await log.failed(risotto_context, + str(err), + ) + error = err if error: if not internal: err = CallError(str(error)) @@ -260,14 +259,14 @@ class PublishDispatcher: False, False, ) - async with self.pool.acquire() as log_connection: - await log_connection.set_type_codec( - 'json', - encoder=dumps, - decoder=loads, - schema='pg_catalog' - ) - async with log_connection.transaction(): + for function_obj in self.messages[version][message]['functions']: + async with self.pool.acquire() as log_connection: + await log_connection.set_type_codec( + 'json', + encoder=dumps, + decoder=loads, + schema='pg_catalog' + ) risotto_context.log_connection = log_connection async with self.pool.acquire() as connection: await connection.set_type_codec( @@ -277,50 +276,49 @@ class PublishDispatcher: schema='pg_catalog' ) risotto_context.connection = connection - for function_obj in self.messages[version][message]['functions']: - function_name = function_obj['function'].__name__ - info_msg = _(f"call function {function_obj['full_module_name']}.{function_name}") - try: + function_name = function_obj['function'].__name__ + info_msg = _(f"call function {function_obj['full_module_name']}.{function_name}") + try: + async with connection.transaction(): + try: + await log.start(risotto_context, + kwargs, + info_msg, + ) + await self.check_message_type(risotto_context, + kwargs, + ) + await self.launch(risotto_context, + kwargs, + config_arguments, + function_obj, + ) + # log the success + await log.success(risotto_context) + except CallError as err: + if get_config()['global']['debug']: + print_exc() + await log.failed(risotto_context, + str(err), + ) + except CallError: + pass + except Exception as err: + # if there is a problem with arguments, log and do nothing + if get_config()['global']['debug']: + print_exc() + async with self.pool.acquire() as connection: + await connection.set_type_codec( + 'json', + encoder=dumps, + decoder=loads, + schema='pg_catalog' + ) + risotto_context.connection = connection async with connection.transaction(): - try: - await log.start(risotto_context, - kwargs, - info_msg, - ) - await self.check_message_type(risotto_context, - kwargs, - ) - await self.launch(risotto_context, - kwargs, - config_arguments, - function_obj, - ) - # log the success - await log.success(risotto_context) - except CallError as err: - if get_config()['global']['debug']: - print_exc() - await log.failed(risotto_context, - str(err), - ) - except CallError: - pass - except Exception as err: - # if there is a problem with arguments, log and do nothing - if get_config()['global']['debug']: - print_exc() - async with self.pool.acquire() as connection: - await connection.set_type_codec( - 'json', - encoder=dumps, - decoder=loads, - schema='pg_catalog' - ) - risotto_context.connection = connection - async with connection.transaction(): - await log.failed(risotto_context, - str(err), - ) + await log.failed(risotto_context, + str(err), + ) class Dispatcher(register.RegisterDispatcher, diff --git a/src/risotto/logger.py b/src/risotto/logger.py index 84fc4b5..77044fe 100644 --- a/src/risotto/logger.py +++ b/src/risotto/logger.py @@ -13,24 +13,24 @@ class Logger: """ async def insert(self, msg: str, - uri: str, - uris: str, risotto_context: Context, level: str, - data: Any=None, + kwargs: Any=None, start: bool=False, ) -> None: + uri = self._get_last_uri(risotto_context) + uris = " ".join(risotto_context.paths) insert = 'INSERT INTO RisottoLog(Msg, URI, URIS, UserLogin, Level' values = 'VALUES($1,$2,$3,$4,$5' args = [msg, uri, uris, risotto_context.username, level] - if data: - insert += ', Data' + if kwargs: + insert += ', Kwargs' values += ',$6' - args.append(dumps(data)) + args.append(dumps(kwargs)) context_id = risotto_context.context_id if context_id is not None: insert += ', ContextId' - if data: + if kwargs: values += ',$7' else: values += ',$6' @@ -38,7 +38,8 @@ class Logger: sql = insert + ') ' + values + ') RETURNING LogId' try: - log_id = await risotto_context.log_connection.fetchval(sql, *args) + async with risotto_context.log_connection.transaction(): + log_id = await risotto_context.log_connection.fetchval(sql, *args) if context_id is None and start: risotto_context.context_id = log_id if start: @@ -51,7 +52,7 @@ class Logger: context_id: int, uri: Optional[str], ) -> list: - sql = '''SELECT Msg as msg, URI as uri_name, URIS as uris, UserLogin as user_login, Level as level, Data as data, StartDate as start_date, StopDate as stop_date + sql = '''SELECT Msg as msg, URI as uri_name, URIS as uris, UserLogin as user_login, Level as level, Kwargs as kwargs, Returns as returns, StartDate as start_date, StopDate as stop_date FROM RisottoLog WHERE UserLogin = $1 AND (LogId = $2 OR ContextId = $2) ''' @@ -60,20 +61,23 @@ class Logger: sql += ' AND URI = $3' args.append(uri) ret = [] - for row in await risotto_context.log_connection.fetch(*args): - d = {} - for key, value in row.items(): - if key == 'data': - if isinstance(value, dict): - pass - elif not value: - value = {} - else: - value = loads(value) - elif key in ['start_date', 'stop_date']: - value = str(value) - d[key] = value - ret.append(d) + async with risotto_context.log_connection.transaction(): + for row in await risotto_context.log_connection.fetch(*args): + d = {} + for key, value in row.items(): + if key in ['kwargs', 'returns']: + if isinstance(value, dict): + pass + elif not value: + value = {} + else: + value = loads(value) + if key == 'uris': + value = value.split(' ') + elif key in ['start_date', 'stop_date']: + value = str(value) + d[key] = value + ret.append(d) return ret def _get_last_uri(self, @@ -111,8 +115,6 @@ class Logger: paths_msg = self._get_message_paths(risotto_context) print(_(f'{risotto_context.username}: ERROR: {error} ({paths_msg} with arguments "{arguments}": {msg})')) await self.insert(msg, - self._get_last_uri(risotto_context), - paths_msg, risotto_context, 'Error', arguments, @@ -129,8 +131,6 @@ class Logger: if get_config()['global']['debug']: print(_(f'{risotto_context.username}: INFO:{paths_msg}: {msg}')) await self.insert(msg, - self._get_last_uri(risotto_context), - paths_msg, risotto_context, 'Info', arguments, @@ -149,8 +149,6 @@ class Logger: context = '' print(_(f'{risotto_context.username}: START{context}:{paths_msg}: {msg}')) await self.insert(msg, - self._get_last_uri(risotto_context), - paths_msg, risotto_context, 'Start', arguments, @@ -175,10 +173,11 @@ class Logger: args.append(dumps(returns)) sql += """WHERE LogId = $1 """ - await risotto_context.log_connection.execute(sql, - risotto_context.start_id, - *args, - ) + async with risotto_context.log_connection.transaction(): + await risotto_context.log_connection.execute(sql, + risotto_context.start_id, + *args, + ) async def failed(self, risotto_context: Context, @@ -190,18 +189,19 @@ class Logger: context = f'({risotto_context.context_id})' else: context = '' - print(_(f'{risotto_context.username}: FAILED({risotto_context.context_id}):{paths_msg}: err')) + print(_(f'{risotto_context.username}: FAILED({risotto_context.context_id}):{paths_msg}: {err}')) sql = """UPDATE RisottoLog SET StopDate = $2, Level = 'FAILED', Msg = $3 WHERE LogId = $1 """ - await risotto_context.log_connection.execute(sql, - risotto_context.start_id, - datetime.now(), - err, - ) + async with risotto_context.log_connection.transaction(): + await risotto_context.log_connection.execute(sql, + risotto_context.start_id, + datetime.now(), + err, + ) async def info(self, risotto_context, @@ -210,8 +210,6 @@ class Logger: if get_config()['global']['debug']: print(msg) await self.insert(msg, - '', - None, risotto_context, 'Info', ) diff --git a/src/risotto/register.py b/src/risotto/register.py index 07644f2..d9ba8be 100644 --- a/src/risotto/register.py +++ b/src/risotto/register.py @@ -298,37 +298,36 @@ class RegisterDispatcher: ) -> None: internal_user = get_config()['global']['internal_user'] async with self.pool.acquire() as log_connection: - async with log_connection.transaction(): - async with self.pool.acquire() as connection: - await connection.set_type_codec( - 'json', - encoder=dumps, - decoder=loads, - schema='pg_catalog' - ) - if truncate: - async with connection.transaction(): - await connection.execute('TRUNCATE InfraServer, InfraSite, InfraZone, Log, ProviderDeployment, ProviderFactoryCluster, ProviderFactoryClusterNode, SettingApplicationservice, SettingApplicationServiceDependency, SettingRelease, SettingServer, SettingServermodel, SettingSource, UserRole, UserRoleURI, UserURI, UserUser, InfraServermodel, ProviderZone, ProviderServer, ProviderSource, ProviderApplicationservice, ProviderServermodel') + async with self.pool.acquire() as connection: + await connection.set_type_codec( + 'json', + encoder=dumps, + decoder=loads, + schema='pg_catalog' + ) + if truncate: async with connection.transaction(): - for submodule_name, module in self.injected_self.items(): - risotto_context = Context() - risotto_context.username = internal_user - risotto_context.paths.append(f'internal.{submodule_name}.on_join') - risotto_context.type = None - risotto_context.log_connection = log_connection - risotto_context.connection = connection - risotto_context.module = submodule_name.split('.', 1)[0] - info_msg = _(f'in function risotto_{submodule_name}.on_join') - await log.info_msg(risotto_context, - None, - info_msg) - try: - await module.on_join(risotto_context) - except Exception as err: - if get_config()['global']['debug']: - print_exc() - msg = _(f'on_join returns an error in module {submodule_name}: {err}') - await log.error_msg(risotto_context, {}, msg) + await connection.execute('TRUNCATE InfraServer, InfraSite, InfraZone, Log, ProviderDeployment, ProviderFactoryCluster, ProviderFactoryClusterNode, SettingApplicationservice, SettingApplicationServiceDependency, SettingRelease, SettingServer, SettingServermodel, SettingSource, UserRole, UserRoleURI, UserURI, UserUser, InfraServermodel, ProviderZone, ProviderServer, ProviderSource, ProviderApplicationservice, ProviderServermodel') + async with connection.transaction(): + for submodule_name, module in self.injected_self.items(): + risotto_context = Context() + risotto_context.username = internal_user + risotto_context.paths.append(f'internal.{submodule_name}.on_join') + risotto_context.type = None + risotto_context.log_connection = log_connection + risotto_context.connection = connection + risotto_context.module = submodule_name.split('.', 1)[0] + info_msg = _(f'in function risotto_{submodule_name}.on_join') + await log.info_msg(risotto_context, + None, + info_msg) + try: + await module.on_join(risotto_context) + except Exception as err: + if get_config()['global']['debug']: + print_exc() + msg = _(f'on_join returns an error in module {submodule_name}: {err}') + await log.error_msg(risotto_context, {}, msg) async def load(self): # valid function's arguments