From 1063d2e735b664fa9d81ba4d3a6c6292e2cf62cc Mon Sep 17 00:00:00 2001 From: Emmanuel Garette Date: Sun, 25 Apr 2021 20:32:02 +0200 Subject: [PATCH] on connection to database to log only --- src/risotto/context.py | 8 ++ src/risotto/dispatcher.py | 247 ++++++++++++++++++-------------------- src/risotto/logger.py | 53 +++++--- src/risotto/register.py | 59 +++++---- 4 files changed, 191 insertions(+), 176 deletions(-) diff --git a/src/risotto/context.py b/src/risotto/context.py index fea3627..acd8e20 100644 --- a/src/risotto/context.py +++ b/src/risotto/context.py @@ -3,3 +3,11 @@ class Context: self.paths = [] self.context_id = None self.start_id = None + + def copy(self): + context = Context() + for key, value in self.__dict__.items(): + if key.startswith('__'): + continue + setattr(context, key, value) + return context diff --git a/src/risotto/dispatcher.py b/src/risotto/dispatcher.py index cd56122..2d9c147 100644 --- a/src/risotto/dispatcher.py +++ b/src/risotto/dispatcher.py @@ -89,7 +89,6 @@ class CallDispatcher: if hasattr(old_risotto_context, 'connection'): # do not start a new database connection risotto_context.connection = old_risotto_context.connection - risotto_context.log_connection = old_risotto_context.log_connection await log.start(risotto_context, kwargs, info_msg, @@ -119,66 +118,58 @@ class CallDispatcher: raise CallError(err) from err else: error = None - async with self.pool.acquire() as log_connection: - await log_connection.set_type_codec( - 'json', - encoder=dumps, - decoder=loads, - schema='pg_catalog' - ) - risotto_context.log_connection = log_connection - try: - async with self.pool.acquire() as connection: - await connection.set_type_codec( - 'json', - encoder=dumps, - decoder=loads, - schema='pg_catalog' - ) - risotto_context.connection = connection - async with connection.transaction(): - try: - await log.start(risotto_context, - kwargs, - info_msg, - ) - await self.check_message_type(risotto_context, - kwargs, - ) - config_arguments = await self.load_kwargs_to_config(risotto_context, - f'{version}.{message}', - kwargs, - check_role, - internal, - ) - ret = await self.launch(risotto_context, - kwargs, - config_arguments, - function_obj, - ) - # log the success - await log.success(risotto_context, - ret, - ) - if not internal and isinstance(ret, dict): - ret['context_id'] = risotto_context.context_id - except CallError as err: - if get_config()['global']['debug']: - print_exc() - await log.failed(risotto_context, - str(err), - ) - raise err from err - except CallError as err: - error = err - except Exception as err: - # if there is a problem with arguments, just send an error and do nothing - if get_config()['global']['debug']: - print_exc() - await log.failed(risotto_context, - str(err), - ) - error = err + try: + async with self.pool.acquire() as connection: + await connection.set_type_codec( + 'json', + encoder=dumps, + decoder=loads, + schema='pg_catalog' + ) + risotto_context.connection = connection + async with connection.transaction(): + try: + await log.start(risotto_context, + kwargs, + info_msg, + ) + await self.check_message_type(risotto_context, + kwargs, + ) + config_arguments = await self.load_kwargs_to_config(risotto_context, + f'{version}.{message}', + kwargs, + check_role, + internal, + ) + ret = await self.launch(risotto_context, + kwargs, + config_arguments, + function_obj, + ) + # log the success + await log.success(risotto_context, + ret, + ) + if not internal and isinstance(ret, dict): + ret['context_id'] = risotto_context.context_id + except CallError as err: + if get_config()['global']['debug']: + print_exc() + await log.failed(risotto_context, + str(err), + ) + raise err from err + except CallError as err: + error = err + except Exception as err: + # if there is a problem with arguments, just send an error and do nothing + if get_config()['global']['debug']: + print_exc() + await log.failed(risotto_context, + str(err), + ) + error = err if error: if not internal: err = CallError(str(error)) @@ -198,8 +189,6 @@ class PublishDispatcher: for message, message_infos in messages.items(): # event not emit locally if message_infos['pattern'] == 'event' and 'functions' in message_infos and message_infos['functions']: -# module, submodule, submessage = message.split('.', 2) -# if f'{module}.{submodule}' not in self.injected_self: uri = f'{version}.{message}' print(f' - {uri}') await self.listened_connection.add_listener(uri, @@ -235,21 +224,34 @@ class PublishDispatcher: version, message = uri.split('.', 1) loop = get_event_loop() remote_kw = loads(payload) - risotto_context = self.build_new_context(remote_kw['context'], - version, - message, - 'event', - ) - callback = lambda: ensure_future(self._publish(version, - message, - risotto_context, - **remote_kw['kwargs'], - )) - loop.call_soon(callback) + for function_obj in self.messages[version][message]['functions']: + risotto_context = self.build_new_context(remote_kw['context'], + version, + message, + 'event', + ) + callback = self.get_callback(version, message, function_obj, risotto_context, remote_kw['kwargs'],) + loop.call_soon(callback) + + def get_callback(self, + version, + message, + function_obj, + risotto_context, + kwargs, + ): + return lambda: ensure_future(self._publish(version, + message, + function_obj, + risotto_context, + **kwargs, + )) + async def _publish(self, version: str, message: str, + function_obj, risotto_context: Context, **kwargs, ) -> None: @@ -259,66 +261,48 @@ class PublishDispatcher: False, False, ) - for function_obj in self.messages[version][message]['functions']: - async with self.pool.acquire() as log_connection: - await log_connection.set_type_codec( - 'json', - encoder=dumps, - decoder=loads, - schema='pg_catalog' - ) - risotto_context.log_connection = log_connection - async with self.pool.acquire() as connection: - await connection.set_type_codec( - 'json', - encoder=dumps, - decoder=loads, - schema='pg_catalog' - ) - risotto_context.connection = connection - function_name = function_obj['function'].__name__ - info_msg = _(f"call function {function_obj['full_module_name']}.{function_name}") + async with self.pool.acquire() as connection: + await connection.set_type_codec( + 'json', + encoder=dumps, + decoder=loads, + schema='pg_catalog' + ) + risotto_context.connection = connection + function_name = function_obj['function'].__name__ + info_msg = _(f"call function {function_obj['full_module_name']}.{function_name}") + try: + async with connection.transaction(): try: - async with connection.transaction(): - try: - await log.start(risotto_context, - kwargs, - info_msg, - ) - await self.check_message_type(risotto_context, - kwargs, - ) - await self.launch(risotto_context, - kwargs, - config_arguments, - function_obj, - ) - # log the success - await log.success(risotto_context) - except CallError as err: - if get_config()['global']['debug']: - print_exc() - await log.failed(risotto_context, - str(err), - ) - except CallError: - pass - except Exception as err: - # if there is a problem with arguments, log and do nothing + await log.start(risotto_context, + kwargs, + info_msg, + ) + await self.check_message_type(risotto_context, + kwargs, + ) + await self.launch(risotto_context, + kwargs, + config_arguments, + function_obj, + ) + # log the success + await log.success(risotto_context) + except CallError as err: if get_config()['global']['debug']: print_exc() - async with self.pool.acquire() as connection: - await connection.set_type_codec( - 'json', - encoder=dumps, - decoder=loads, - schema='pg_catalog' - ) - risotto_context.connection = connection - async with connection.transaction(): - await log.failed(risotto_context, - str(err), - ) + await log.failed(risotto_context, + str(err), + ) + except CallError: + pass + except Exception as err: + # if there is a problem with arguments, log and do nothing + if get_config()['global']['debug']: + print_exc() + await log.failed(risotto_context, + str(err), + ) class Dispatcher(register.RegisterDispatcher, @@ -346,6 +330,7 @@ class Dispatcher(register.RegisterDispatcher, risotto_context.type = type risotto_context.message = message risotto_context.version = version + risotto_context.pool = self.pool return risotto_context async def check_message_type(self, diff --git a/src/risotto/logger.py b/src/risotto/logger.py index 77044fe..4cc84ab 100644 --- a/src/risotto/logger.py +++ b/src/risotto/logger.py @@ -2,15 +2,34 @@ from typing import Dict, Any, Optional from json import dumps, loads from asyncpg.exceptions import UndefinedTableError from datetime import datetime +from asyncio import Lock from .context import Context from .utils import _ from .config import get_config +database_lock = Lock() + class Logger: """ An object to manager log """ + def __init__(self) -> None: + self.log_connection = None + + async def get_connection(self, + risotto_context: Context, + ): + if not self.log_connection: + self.log_connection = await risotto_context.pool.acquire() + await self.log_connection.set_type_codec( + 'json', + encoder=dumps, + decoder=loads, + schema='pg_catalog' + ) + return self.log_connection + async def insert(self, msg: str, risotto_context: Context, @@ -38,8 +57,9 @@ class Logger: sql = insert + ') ' + values + ') RETURNING LogId' try: - async with risotto_context.log_connection.transaction(): - log_id = await risotto_context.log_connection.fetchval(sql, *args) + async with database_lock: + connection = await self.get_connection(risotto_context) + log_id = await connection.fetchval(sql, *args) if context_id is None and start: risotto_context.context_id = log_id if start: @@ -61,8 +81,9 @@ class Logger: sql += ' AND URI = $3' args.append(uri) ret = [] - async with risotto_context.log_connection.transaction(): - for row in await risotto_context.log_connection.fetch(*args): + async with database_lock: + connection = await self.get_connection(risotto_context) + for row in await connection.fetch(*args): d = {} for key, value in row.items(): if key in ['kwargs', 'returns']: @@ -173,11 +194,12 @@ class Logger: args.append(dumps(returns)) sql += """WHERE LogId = $1 """ - async with risotto_context.log_connection.transaction(): - await risotto_context.log_connection.execute(sql, - risotto_context.start_id, - *args, - ) + async with database_lock: + connection = await self.get_connection(risotto_context) + await connection.execute(sql, + risotto_context.start_id, + *args, + ) async def failed(self, risotto_context: Context, @@ -196,12 +218,13 @@ class Logger: Msg = $3 WHERE LogId = $1 """ - async with risotto_context.log_connection.transaction(): - await risotto_context.log_connection.execute(sql, - risotto_context.start_id, - datetime.now(), - err, - ) + async with database_lock: + connection = await self.get_connection(risotto_context) + await connection.execute(sql, + risotto_context.start_id, + datetime.now(), + err, + ) async def info(self, risotto_context, diff --git a/src/risotto/register.py b/src/risotto/register.py index d9ba8be..b08a1b3 100644 --- a/src/risotto/register.py +++ b/src/risotto/register.py @@ -297,37 +297,36 @@ class RegisterDispatcher: truncate: bool=False, ) -> None: internal_user = get_config()['global']['internal_user'] - async with self.pool.acquire() as log_connection: - async with self.pool.acquire() as connection: - await connection.set_type_codec( - 'json', - encoder=dumps, - decoder=loads, - schema='pg_catalog' - ) - if truncate: - async with connection.transaction(): - await connection.execute('TRUNCATE InfraServer, InfraSite, InfraZone, Log, ProviderDeployment, ProviderFactoryCluster, ProviderFactoryClusterNode, SettingApplicationservice, SettingApplicationServiceDependency, SettingRelease, SettingServer, SettingServermodel, SettingSource, UserRole, UserRoleURI, UserURI, UserUser, InfraServermodel, ProviderZone, ProviderServer, ProviderSource, ProviderApplicationservice, ProviderServermodel') + async with self.pool.acquire() as connection: + await connection.set_type_codec( + 'json', + encoder=dumps, + decoder=loads, + schema='pg_catalog' + ) + if truncate: async with connection.transaction(): - for submodule_name, module in self.injected_self.items(): - risotto_context = Context() - risotto_context.username = internal_user - risotto_context.paths.append(f'internal.{submodule_name}.on_join') - risotto_context.type = None - risotto_context.log_connection = log_connection - risotto_context.connection = connection - risotto_context.module = submodule_name.split('.', 1)[0] - info_msg = _(f'in function risotto_{submodule_name}.on_join') - await log.info_msg(risotto_context, - None, - info_msg) - try: - await module.on_join(risotto_context) - except Exception as err: - if get_config()['global']['debug']: - print_exc() - msg = _(f'on_join returns an error in module {submodule_name}: {err}') - await log.error_msg(risotto_context, {}, msg) + await connection.execute('TRUNCATE InfraServer, InfraSite, InfraZone, Log, ProviderDeployment, ProviderFactoryCluster, ProviderFactoryClusterNode, SettingApplicationservice, SettingApplicationServiceDependency, SettingRelease, SettingServer, SettingServermodel, SettingSource, UserRole, UserRoleURI, UserURI, UserUser, InfraServermodel, ProviderZone, ProviderServer, ProviderSource, ProviderApplicationservice, ProviderServermodel') + async with connection.transaction(): + for submodule_name, module in self.injected_self.items(): + risotto_context = Context() + risotto_context.username = internal_user + risotto_context.paths.append(f'internal.{submodule_name}.on_join') + risotto_context.type = None + risotto_context.pool = self.pool + risotto_context.connection = connection + risotto_context.module = submodule_name.split('.', 1)[0] + info_msg = _(f'in function risotto_{submodule_name}.on_join') + await log.info_msg(risotto_context, + None, + info_msg) + try: + await module.on_join(risotto_context) + except Exception as err: + if get_config()['global']['debug']: + print_exc() + msg = _(f'on_join returns an error in module {submodule_name}: {err}') + await log.error_msg(risotto_context, {}, msg) async def load(self): # valid function's arguments