From 9ebe79d533a9351a76ea69b9381c74122e0c90de Mon Sep 17 00:00:00 2001 From: Emmanuel Garette Date: Sat, 24 Apr 2021 12:56:44 +0200 Subject: [PATCH] special connexion for log (do not rollback if error) --- src/risotto/dispatcher.py | 132 +++++++++++++++++++++----------------- src/risotto/http.py | 32 ++++++--- src/risotto/logger.py | 40 +++++++----- src/risotto/register.py | 61 +++++++++--------- 4 files changed, 152 insertions(+), 113 deletions(-) diff --git a/src/risotto/dispatcher.py b/src/risotto/dispatcher.py index e0361a0..929d958 100644 --- a/src/risotto/dispatcher.py +++ b/src/risotto/dispatcher.py @@ -48,8 +48,8 @@ class CallDispatcher: except AttributeError: err = _(f'function {module_name}.{function_name} return the unknown parameter "{key}" for the uri "{risotto_context.version}.{risotto_context.message}"') raise CallError(err) - except ValueError: - err = _(f'function {module_name}.{function_name} return the parameter "{key}" with an unvalid value "{value}" for the uri "{risotto_context.version}.{risotto_context.message}"') + except ValueError as err: + err = _(f'function {module_name}.{function_name} return the invalid parameter "{key}" for the uri "{risotto_context.version}.{risotto_context.message}": {err}') raise CallError(err) await config.property.read_only() mandatories = await config.value.mandatory() @@ -89,6 +89,7 @@ class CallDispatcher: if hasattr(old_risotto_context, 'connection'): # do not start a new database connection risotto_context.connection = old_risotto_context.connection + risotto_context.log_connection = old_risotto_context.log_connection await log.start(risotto_context, kwargs, info_msg, @@ -108,73 +109,84 @@ class CallDispatcher: config_arguments, function_obj, ) - except CallError as err: + await log.success(risotto_context, + ret, + ) + except Exception as err: await log.failed(risotto_context, str(err), ) - raise err from err + raise CallError(err) from err else: - try: - async with self.pool.acquire() as connection: - await connection.set_type_codec( - 'json', - encoder=dumps, - decoder=loads, - schema='pg_catalog' - ) - risotto_context.connection = connection - async with connection.transaction(): - try: - await log.start(risotto_context, - kwargs, - info_msg, - ) - await self.check_message_type(risotto_context, - kwargs, - ) - config_arguments = await self.load_kwargs_to_config(risotto_context, - f'{version}.{message}', - kwargs, - check_role, - internal, - ) - ret = await self.launch(risotto_context, + error = None + async with self.pool.acquire() as log_connection: + await log_connection.set_type_codec( + 'json', + encoder=dumps, + decoder=loads, + schema='pg_catalog' + ) + async with log_connection.transaction(): + try: + risotto_context.log_connection = log_connection + async with self.pool.acquire() as connection: + await connection.set_type_codec( + 'json', + encoder=dumps, + decoder=loads, + schema='pg_catalog' + ) + risotto_context.connection = connection + async with connection.transaction(): + try: + await log.start(risotto_context, kwargs, - config_arguments, - function_obj, + info_msg, ) - # log the success - await log.success(risotto_context, - ret, - ) - if not internal and isinstance(ret, dict): - ret['context_id'] = risotto_context.context_id - except CallError as err: - if get_config()['global']['debug']: - print_exc() - await log.failed(risotto_context, - str(err), - ) - raise err from err - except CallError as err: - raise err from err - except Exception as err: - # if there is a problem with arguments, just send an error and do nothing - if get_config()['global']['debug']: - print_exc() - async with self.pool.acquire() as connection: - await connection.set_type_codec( - 'json', - encoder=dumps, - decoder=loads, - schema='pg_catalog' - ) - risotto_context.connection = connection - async with connection.transaction(): + await self.check_message_type(risotto_context, + kwargs, + ) + config_arguments = await self.load_kwargs_to_config(risotto_context, + f'{version}.{message}', + kwargs, + check_role, + internal, + ) + ret = await self.launch(risotto_context, + kwargs, + config_arguments, + function_obj, + ) + # log the success + await log.success(risotto_context, + ret, + ) + if not internal and isinstance(ret, dict): + ret['context_id'] = risotto_context.context_id + except CallError as err: + if get_config()['global']['debug']: + print_exc() + await log.failed(risotto_context, + str(err), + ) + raise err from err + except CallError as err: + error = err + except Exception as err: + # if there is a problem with arguments, just send an error and do nothing + if get_config()['global']['debug']: + print_exc() await log.failed(risotto_context, str(err), ) - raise err from err + error = err + if error: + if not internal: + err = CallError(str(error)) + err.context_id = risotto_context.context_id + else: + err = error + raise err from error return ret diff --git a/src/risotto/http.py b/src/risotto/http.py index b5779b3..5b235f2 100644 --- a/src/risotto/http.py +++ b/src/risotto/http.py @@ -101,15 +101,31 @@ async def handle(request): internal=False, **kwargs, ) - except NotAllowedError as err: - raise HTTPNotFound(reason=str(err)) - except CallError as err: - raise HTTPBadRequest(reason=str(err).replace('\n', ' ')) except Exception as err: - if get_config()['global']['debug']: - print_exc() - raise HTTPInternalServerError(reason=str(err)) - return Response(text=dumps({'response': text}), + context_id = None + if isinstance(err, NotAllowedError): + error_type = HTTPNotFound + elif isinstance(err, CallError): + error_type = HTTPBadRequest + context_id = err.context_id + else: + if get_config()['global']['debug']: + print_exc() + error_type = HTTPInternalServerError + response = {'type': 'error', + 'reason': str(err).replace('\n', ' '), + } + if context_id is not None: + response['context_id'] = context_id + err = dumps({'response': response, + 'type': 'error', + }) + raise error_type(text=err, + content_type='application/json', + ) + return Response(text=dumps({'response': text, + 'type': 'success', + }), content_type='application/json', ) diff --git a/src/risotto/logger.py b/src/risotto/logger.py index 62bb058..6a286e9 100644 --- a/src/risotto/logger.py +++ b/src/risotto/logger.py @@ -38,7 +38,7 @@ class Logger: sql = insert + ') ' + values + ') RETURNING LogId' try: - log_id = await risotto_context.connection.fetchval(sql, *args) + log_id = await risotto_context.log_connection.fetchval(sql, *args) if context_id is None and start: risotto_context.context_id = log_id if start: @@ -60,14 +60,14 @@ class Logger: sql += ' AND URI = $3' args.append(uri) ret = [] - for row in await risotto_context.connection.fetch(*args): + for row in await risotto_context.log_connection.fetch(*args): d = {} for key, value in row.items(): if key == 'data': if not value: value = {} - else: - value = loads(value) +# else: +# value = loads(value) elif key in ['start_date', 'stop_date']: value = str(value) d[key] = value @@ -141,7 +141,11 @@ class Logger: ) -> None: paths_msg = self._get_message_paths(risotto_context) if get_config()['global']['debug']: - print(_(f'{risotto_context.username}: START:{paths_msg}: {msg}')) + if risotto_context.context_id != None: + context = f'({risotto_context.context_id})' + else: + context = '' + print(_(f'{risotto_context.username}: START{context}:{paths_msg}: {msg}')) await self.insert(msg, self._get_last_uri(risotto_context), paths_msg, @@ -157,7 +161,7 @@ class Logger: ) -> None: if get_config()['global']['debug']: paths_msg = self._get_message_paths(risotto_context) - print(_(f'{risotto_context.username}: SUCCESS:{paths_msg}({risotto_context.context_id})')) + print(_(f'{risotto_context.username}: SUCCESS({risotto_context.context_id}):{paths_msg}')) sql = """UPDATE RisottoLog SET StopDate = $2, Level = 'SUCCESS' @@ -169,10 +173,10 @@ class Logger: args.append(dumps(returns)) sql += """WHERE LogId = $1 """ - await risotto_context.connection.execute(sql, - risotto_context.start_id, - *args, - ) + await risotto_context.log_connection.execute(sql, + risotto_context.start_id, + *args, + ) async def failed(self, risotto_context: Context, @@ -180,18 +184,22 @@ class Logger: ) -> None: if get_config()['global']['debug']: paths_msg = self._get_message_paths(risotto_context) - print(_(f'{risotto_context.username}: FAILED:{paths_msg}({risotto_context.context_id}): err')) + if risotto_context.context_id != None: + context = f'({risotto_context.context_id})' + else: + context = '' + print(_(f'{risotto_context.username}: FAILED({risotto_context.context_id}):{paths_msg}: err')) sql = """UPDATE RisottoLog SET StopDate = $2, Level = 'FAILED', Msg = $3 WHERE LogId = $1 """ - await risotto_context.connection.execute(sql, - risotto_context.start_id, - datetime.now(), - err, - ) + await risotto_context.log_connection.execute(sql, + risotto_context.start_id, + datetime.now(), + err, + ) async def info(self, risotto_context, diff --git a/src/risotto/register.py b/src/risotto/register.py index 44cb9bb..07644f2 100644 --- a/src/risotto/register.py +++ b/src/risotto/register.py @@ -297,35 +297,38 @@ class RegisterDispatcher: truncate: bool=False, ) -> None: internal_user = get_config()['global']['internal_user'] - async with self.pool.acquire() as connection: - await connection.set_type_codec( - 'json', - encoder=dumps, - decoder=loads, - schema='pg_catalog' - ) - if truncate: - async with connection.transaction(): - await connection.execute('TRUNCATE InfraServer, InfraSite, InfraZone, Log, ProviderDeployment, ProviderFactoryCluster, ProviderFactoryClusterNode, SettingApplicationservice, SettingApplicationServiceDependency, SettingRelease, SettingServer, SettingServermodel, SettingSource, UserRole, UserRoleURI, UserURI, UserUser, InfraServermodel, ProviderZone, ProviderServer, ProviderSource, ProviderApplicationservice, ProviderServermodel') - async with connection.transaction(): - for submodule_name, module in self.injected_self.items(): - risotto_context = Context() - risotto_context.username = internal_user - risotto_context.paths.append(f'internal.{submodule_name}.on_join') - risotto_context.type = None - risotto_context.connection = connection - risotto_context.module = submodule_name.split('.', 1)[0] - info_msg = _(f'in function risotto_{submodule_name}.on_join') - await log.info_msg(risotto_context, - None, - info_msg) - try: - await module.on_join(risotto_context) - except Exception as err: - if get_config()['global']['debug']: - print_exc() - msg = _(f'on_join returns an error in module {submodule_name}: {err}') - await log.error_msg(risotto_context, {}, msg) + async with self.pool.acquire() as log_connection: + async with log_connection.transaction(): + async with self.pool.acquire() as connection: + await connection.set_type_codec( + 'json', + encoder=dumps, + decoder=loads, + schema='pg_catalog' + ) + if truncate: + async with connection.transaction(): + await connection.execute('TRUNCATE InfraServer, InfraSite, InfraZone, Log, ProviderDeployment, ProviderFactoryCluster, ProviderFactoryClusterNode, SettingApplicationservice, SettingApplicationServiceDependency, SettingRelease, SettingServer, SettingServermodel, SettingSource, UserRole, UserRoleURI, UserURI, UserUser, InfraServermodel, ProviderZone, ProviderServer, ProviderSource, ProviderApplicationservice, ProviderServermodel') + async with connection.transaction(): + for submodule_name, module in self.injected_self.items(): + risotto_context = Context() + risotto_context.username = internal_user + risotto_context.paths.append(f'internal.{submodule_name}.on_join') + risotto_context.type = None + risotto_context.log_connection = log_connection + risotto_context.connection = connection + risotto_context.module = submodule_name.split('.', 1)[0] + info_msg = _(f'in function risotto_{submodule_name}.on_join') + await log.info_msg(risotto_context, + None, + info_msg) + try: + await module.on_join(risotto_context) + except Exception as err: + if get_config()['global']['debug']: + print_exc() + msg = _(f'on_join returns an error in module {submodule_name}: {err}') + await log.error_msg(risotto_context, {}, msg) async def load(self): # valid function's arguments