better log support

This commit is contained in:
Emmanuel Garette 2021-04-24 10:12:39 +02:00
parent 19240489db
commit 4c83e6d89d
6 changed files with 274 additions and 102 deletions

View File

@ -1,8 +1,16 @@
CREATE TABLE log( CREATE TABLE RisottoLog(
LogId SERIAL PRIMARY KEY,
Msg VARCHAR(255) NOT NULL, Msg VARCHAR(255) NOT NULL,
URI VARCHAR(255),
URIS VARCHAR(255),
UserLogin VARCHAR(100) NOT NULL,
Level VARCHAR(10) NOT NULL, Level VARCHAR(10) NOT NULL,
Path VARCHAR(255), ContextId INTEGER,
Username VARCHAR(100) NOT NULL,
Data JSON, Data JSON,
Date timestamp DEFAULT current_timestamp Returns JSON,
StartDate timestamp DEFAULT current_timestamp,
StopDate timestamp
); );
CREATE INDEX RisottoLog_ContextId_index ON RisottoLog(ContextId);
CREATE INDEX RisottoLog_Login_index ON RisottoLog(UserLogin);
CREATE INDEX RisottoLog_URI_index ON RisottoLog(URI);

View File

@ -17,6 +17,10 @@ if 'RISOTTO_PORT' in environ:
RISOTTO_PORT = environ['RISOTTO_PORT'] RISOTTO_PORT = environ['RISOTTO_PORT']
else: else:
RISOTTO_PORT = config.get('RISOTTO_PORT', 8080) RISOTTO_PORT = config.get('RISOTTO_PORT', 8080)
if 'RISOTTO_URL' in environ:
RISOTTO_URL = environ['RISOTTO_URL']
else:
RISOTTO_URL = config.get('RISOTTO_URL', 'http://localhost:8080/')
if 'CONFIGURATION_DIR' in environ: if 'CONFIGURATION_DIR' in environ:
CONFIGURATION_DIR = environ['CONFIGURATION_DIR'] CONFIGURATION_DIR = environ['CONFIGURATION_DIR']
else: else:
@ -135,7 +139,8 @@ _config = {'database': {'dsn': dsn_factory(RISOTTO_DB_NAME, RISOTTO_DB_USER, RIS
'celery_dsn': dsn_factory(CELERYRISOTTO_DB_NAME, CELERYRISOTTO_DB_USER, CELERYRISOTTO_DB_PASSWORD) 'celery_dsn': dsn_factory(CELERYRISOTTO_DB_NAME, CELERYRISOTTO_DB_USER, CELERYRISOTTO_DB_PASSWORD)
}, },
'http_server': {'port': RISOTTO_PORT, 'http_server': {'port': RISOTTO_PORT,
'default_user': DEFAULT_USER}, 'default_user': DEFAULT_USER,
'url': RISOTTO_URL},
'global': {'message_root_path': MESSAGE_PATH, 'global': {'message_root_path': MESSAGE_PATH,
'configurations_dir': CONFIGURATION_DIR, 'configurations_dir': CONFIGURATION_DIR,
'debug': True, 'debug': True,

View File

@ -1,3 +1,5 @@
class Context: class Context:
def __init__(self): def __init__(self):
self.paths = [] self.paths = []
self.context_id = None
self.start_id = None

View File

@ -30,14 +30,11 @@ class CallDispatcher:
if response.impl_get_information('multi'): if response.impl_get_information('multi'):
if not isinstance(returns, list): if not isinstance(returns, list):
err = _(f'function {module_name}.{function_name} has to return a list') err = _(f'function {module_name}.{function_name} has to return a list')
await log.error_msg(risotto_context, kwargs, err) raise CallError(err)
raise CallError(str(err))
else: else:
if not isinstance(returns, dict): if not isinstance(returns, dict):
await log.error_msg(risotto_context, kwargs, returns)
err = _(f'function {module_name}.{function_name} has to return a dict') err = _(f'function {module_name}.{function_name} has to return a dict')
await log.error_msg(risotto_context, kwargs, err) raise CallError(err)
raise CallError(str(err))
returns = [returns] returns = [returns]
if response is None: if response is None:
raise Exception('hu?') raise Exception('hu?')
@ -50,12 +47,10 @@ class CallDispatcher:
await config.option(key).value.set(value) await config.option(key).value.set(value)
except AttributeError: except AttributeError:
err = _(f'function {module_name}.{function_name} return the unknown parameter "{key}" for the uri "{risotto_context.version}.{risotto_context.message}"') err = _(f'function {module_name}.{function_name} return the unknown parameter "{key}" for the uri "{risotto_context.version}.{risotto_context.message}"')
await log.error_msg(risotto_context, kwargs, err) raise CallError(err)
raise CallError(str(err))
except ValueError: except ValueError:
err = _(f'function {module_name}.{function_name} return the parameter "{key}" with an unvalid value "{value}" for the uri "{risotto_context.version}.{risotto_context.message}"') err = _(f'function {module_name}.{function_name} return the parameter "{key}" with an unvalid value "{value}" for the uri "{risotto_context.version}.{risotto_context.message}"')
await log.error_msg(risotto_context, kwargs, err) raise CallError(err)
raise CallError(str(err))
await config.property.read_only() await config.property.read_only()
mandatories = await config.value.mandatory() mandatories = await config.value.mandatory()
if mandatories: if mandatories:
@ -65,8 +60,7 @@ class CallDispatcher:
await config.value.dict() await config.value.dict()
except Exception as err: except Exception as err:
err = _(f'function {module_name}.{function_name} return an invalid response {err} for the uri "{risotto_context.version}.{risotto_context.message}"') err = _(f'function {module_name}.{function_name} return an invalid response {err} for the uri "{risotto_context.version}.{risotto_context.message}"')
await log.error_msg(risotto_context, kwargs, err) raise CallError(err)
raise CallError(str(err))
async def call(self, async def call(self,
version: str, version: str,
@ -89,9 +83,16 @@ class CallDispatcher:
if message not in self.messages[version]: if message not in self.messages[version]:
raise CallError(_(f'cannot find message "{version}.{message}"')) raise CallError(_(f'cannot find message "{version}.{message}"'))
function_obj = self.messages[version][message] function_obj = self.messages[version][message]
# do not start a new database connection # log
function_name = function_obj['function'].__name__
info_msg = _(f"call function {function_obj['full_module_name']}.{function_name}")
if hasattr(old_risotto_context, 'connection'): if hasattr(old_risotto_context, 'connection'):
# do not start a new database connection
risotto_context.connection = old_risotto_context.connection risotto_context.connection = old_risotto_context.connection
await log.start(risotto_context,
kwargs,
info_msg,
)
await self.check_message_type(risotto_context, await self.check_message_type(risotto_context,
kwargs, kwargs,
) )
@ -101,22 +102,19 @@ class CallDispatcher:
check_role, check_role,
internal, internal,
) )
return await self.launch(risotto_context, try:
kwargs, ret = await self.launch(risotto_context,
config_arguments, kwargs,
function_obj, config_arguments,
) function_obj,
)
except CallError as err:
await log.failed(risotto_context,
str(err),
)
raise err from err
else: else:
try: try:
await self.check_message_type(risotto_context,
kwargs,
)
config_arguments = await self.load_kwargs_to_config(risotto_context,
f'{version}.{message}',
kwargs,
check_role,
internal,
)
async with self.pool.acquire() as connection: async with self.pool.acquire() as connection:
await connection.set_type_codec( await connection.set_type_codec(
'json', 'json',
@ -126,13 +124,40 @@ class CallDispatcher:
) )
risotto_context.connection = connection risotto_context.connection = connection
async with connection.transaction(): async with connection.transaction():
return await self.launch(risotto_context, try:
kwargs, await log.start(risotto_context,
config_arguments, kwargs,
function_obj, info_msg,
) )
await self.check_message_type(risotto_context,
kwargs,
)
config_arguments = await self.load_kwargs_to_config(risotto_context,
f'{version}.{message}',
kwargs,
check_role,
internal,
)
ret = await self.launch(risotto_context,
kwargs,
config_arguments,
function_obj,
)
# log the success
await log.success(risotto_context,
ret,
)
if not internal and isinstance(ret, dict):
ret['context_id'] = risotto_context.context_id
except CallError as err:
if get_config()['global']['debug']:
print_exc()
await log.failed(risotto_context,
str(err),
)
raise err from err
except CallError as err: except CallError as err:
raise err raise err from err
except Exception as err: except Exception as err:
# if there is a problem with arguments, just send an error and do nothing # if there is a problem with arguments, just send an error and do nothing
if get_config()['global']['debug']: if get_config()['global']['debug']:
@ -146,8 +171,11 @@ class CallDispatcher:
) )
risotto_context.connection = connection risotto_context.connection = connection
async with connection.transaction(): async with connection.transaction():
await log.error_msg(risotto_context, kwargs, err) await log.failed(risotto_context,
raise err str(err),
)
raise err from err
return ret
class PublishDispatcher: class PublishDispatcher:
@ -180,6 +208,7 @@ class PublishDispatcher:
remote_kw = dumps({'kwargs': kwargs, remote_kw = dumps({'kwargs': kwargs,
'context': {'username': risotto_context.username, 'context': {'username': risotto_context.username,
'paths': risotto_context.paths, 'paths': risotto_context.paths,
'context_id': risotto_context.context_id,
} }
}) })
# FIXME should be better :/ # FIXME should be better :/
@ -219,26 +248,41 @@ class PublishDispatcher:
False, False,
False, False,
) )
for function_obj in self.messages[version][message]['functions']: async with self.pool.acquire() as connection:
async with self.pool.acquire() as connection: await connection.set_type_codec(
'json',
encoder=dumps,
decoder=loads,
schema='pg_catalog'
)
risotto_context.connection = connection
for function_obj in self.messages[version][message]['functions']:
function_name = function_obj['function'].__name__
info_msg = _(f"call function {function_obj['full_module_name']}.{function_name}")
try: try:
await self.check_message_type(risotto_context,
kwargs,
)
await connection.set_type_codec(
'json',
encoder=dumps,
decoder=loads,
schema='pg_catalog'
)
risotto_context.connection = connection
async with connection.transaction(): async with connection.transaction():
await self.launch(risotto_context, try:
kwargs, await log.start(risotto_context,
config_arguments, kwargs,
function_obj, info_msg,
) )
except CallError as err: await self.check_message_type(risotto_context,
kwargs,
)
await self.launch(risotto_context,
kwargs,
config_arguments,
function_obj,
)
# log the success
await log.success(risotto_context)
except CallError as err:
if get_config()['global']['debug']:
print_exc()
await log.failed(risotto_context,
str(err),
)
except CallError:
pass pass
except Exception as err: except Exception as err:
# if there is a problem with arguments, log and do nothing # if there is a problem with arguments, log and do nothing
@ -253,12 +297,15 @@ class PublishDispatcher:
) )
risotto_context.connection = connection risotto_context.connection = connection
async with connection.transaction(): async with connection.transaction():
await log.error_msg(risotto_context, kwargs, err) await log.failed(risotto_context,
str(err),
)
class Dispatcher(register.RegisterDispatcher, class Dispatcher(register.RegisterDispatcher,
CallDispatcher, CallDispatcher,
PublishDispatcher): PublishDispatcher,
):
""" Manage message (call or publish) """ Manage message (call or publish)
so launch a function when a message is called so launch a function when a message is called
""" """
@ -274,6 +321,7 @@ class Dispatcher(register.RegisterDispatcher,
risotto_context = Context() risotto_context = Context()
risotto_context.username = context['username'] risotto_context.username = context['username']
risotto_context.paths = copy(context['paths']) risotto_context.paths = copy(context['paths'])
risotto_context.context_id = context['context_id']
risotto_context.paths.append(uri) risotto_context.paths.append(uri)
risotto_context.uri = uri risotto_context.uri = uri
risotto_context.type = type risotto_context.type = type
@ -287,7 +335,6 @@ class Dispatcher(register.RegisterDispatcher,
) -> None: ) -> None:
if self.messages[risotto_context.version][risotto_context.message]['pattern'] != risotto_context.type: if self.messages[risotto_context.version][risotto_context.message]['pattern'] != risotto_context.type:
msg = _(f'{risotto_context.uri} is not a {risotto_context.type} message') msg = _(f'{risotto_context.uri} is not a {risotto_context.type} message')
await log.error_msg(risotto_context, kwargs, msg)
raise CallError(msg) raise CallError(msg)
async def load_kwargs_to_config(self, async def load_kwargs_to_config(self,
@ -396,8 +443,6 @@ class Dispatcher(register.RegisterDispatcher,
# so send the message # so send the message
function = function_obj['function'] function = function_obj['function']
risotto_context.module = function_obj['module'].split('.', 1)[0] risotto_context.module = function_obj['module'].split('.', 1)[0]
function_name = function.__name__
info_msg = _(f"in function {function_obj['full_module_name']}.{function_name}")
# build argument for this function # build argument for this function
if risotto_context.type == 'rpc': if risotto_context.type == 'rpc':
kw = config_arguments kw = config_arguments
@ -408,6 +453,7 @@ class Dispatcher(register.RegisterDispatcher,
kw[key] = value kw[key] = value
kw['risotto_context'] = risotto_context kw['risotto_context'] = risotto_context
# launch
returns = await function(self.get_service(function_obj['module']), **kw) returns = await function(self.get_service(function_obj['module']), **kw)
if risotto_context.type == 'rpc': if risotto_context.type == 'rpc':
# valid returns # valid returns
@ -416,12 +462,6 @@ class Dispatcher(register.RegisterDispatcher,
returns, returns,
kwargs, kwargs,
) )
# log the success
await log.info_msg(risotto_context,
{'arguments': kwargs,
'returns': returns},
info_msg,
)
# notification # notification
if function_obj.get('notification'): if function_obj.get('notification'):
notif_version, notif_message = function_obj['notification'].split('.', 1) notif_version, notif_message = function_obj['notification'].split('.', 1)

View File

@ -1,7 +1,7 @@
from typing import Dict, Any from typing import Dict, Any, Optional
from json import dumps from json import dumps, loads
from asyncpg.exceptions import UndefinedTableError from asyncpg.exceptions import UndefinedTableError
from datetime import datetime
from .context import Context from .context import Context
from .utils import _ from .utils import _
@ -13,26 +13,79 @@ class Logger:
""" """
async def insert(self, async def insert(self,
msg: str, msg: str,
path: str, uri: str,
risotto_context: str, uris: str,
risotto_context: Context,
level: str, level: str,
data: Any= None) -> None: data: Any=None,
insert = 'INSERT INTO log(Msg, Path, Username, Level' start: bool=False,
values = 'VALUES($1,$2,$3,$4' ) -> None:
args = [msg, path, risotto_context.username, level] insert = 'INSERT INTO RisottoLog(Msg, URI, URIS, UserLogin, Level'
values = 'VALUES($1,$2,$3,$4,$5'
args = [msg, uri, uris, risotto_context.username, level]
if data: if data:
insert += ', Data' insert += ', Data'
values += ',$5' values += ',$6'
args.append(dumps(data)) args.append(dumps(data))
context_id = risotto_context.context_id
if context_id is not None:
insert += ', ContextId'
if data:
values += ',$7'
else:
values += ',$6'
args.append(context_id)
sql = insert + ') ' + values + ')' sql = insert + ') ' + values + ') RETURNING LogId'
try: try:
await risotto_context.connection.fetch(sql, *args) log_id = await risotto_context.connection.fetchval(sql, *args)
if context_id is None and start:
risotto_context.context_id = log_id
if start:
risotto_context.start_id = log_id
except UndefinedTableError as err: except UndefinedTableError as err:
raise Exception(_(f'cannot access to database ({err}), was the database really created?')) raise Exception(_(f'cannot access to database ({err}), was the database really created?'))
async def query(self,
risotto_context: Context,
context_id: int,
uri: Optional[str],
) -> list:
sql = '''SELECT Msg as msg, URI as uri_name, URIS as uris, UserLogin as user_login, Level as level, Data as data, StartDate as start_date, StopDate as stop_date
FROM RisottoLog
WHERE UserLogin = $1 AND (LogId = $2 OR ContextId = $2)
'''
args = [sql, risotto_context.username, context_id]
if uri is not None:
sql += ' AND URI = $3'
args.append(uri)
ret = []
for row in await risotto_context.connection.fetch(*args):
d = {}
for key, value in row.items():
if key == 'data':
if not value:
value = {}
else:
value = loads(value)
elif key in ['start_date', 'stop_date']:
value = str(value)
d[key] = value
ret.append(d)
return ret
def _get_last_uri(self,
risotto_context: Context,
) -> str:
if risotto_context.paths:
return risotto_context.paths[-1]
return ''
def _get_message_paths(self, def _get_message_paths(self,
risotto_context: Context): risotto_context: Context,
) -> str:
if not risotto_context.paths:
return ''
paths = risotto_context.paths paths = risotto_context.paths
if risotto_context.type: if risotto_context.type:
paths_msg = f' {risotto_context.type} ' paths_msg = f' {risotto_context.type} '
@ -49,44 +102,109 @@ class Logger:
risotto_context: Context, risotto_context: Context,
arguments, arguments,
error: str, error: str,
msg: str=''): msg: str='',
):
""" send message when an error append """ send message when an error append
""" """
paths_msg = self._get_message_paths(risotto_context) paths_msg = self._get_message_paths(risotto_context)
print(_(f'{risotto_context.username}: ERROR: {error} ({paths_msg} with arguments "{arguments}": {msg})')) print(_(f'{risotto_context.username}: ERROR: {error} ({paths_msg} with arguments "{arguments}": {msg})'))
await self.insert(msg, await self.insert(msg,
self._get_last_uri(risotto_context),
paths_msg, paths_msg,
risotto_context, risotto_context,
'Error', 'Error',
arguments) arguments,
)
async def info_msg(self, async def info_msg(self,
risotto_context: Context, risotto_context: Context,
arguments: Dict, arguments: Dict,
msg: str=''): msg: str='',
) -> None:
""" send message with common information """ send message with common information
""" """
if risotto_context.paths: paths_msg = self._get_message_paths(risotto_context)
paths_msg = self._get_message_paths(risotto_context)
else:
paths_msg = ''
if get_config()['global']['debug']: if get_config()['global']['debug']:
print(_(f'{risotto_context.username}: INFO:{paths_msg}: {msg}')) print(_(f'{risotto_context.username}: INFO:{paths_msg}: {msg}'))
await self.insert(msg, await self.insert(msg,
self._get_last_uri(risotto_context),
paths_msg, paths_msg,
risotto_context, risotto_context,
'Info', 'Info',
arguments) arguments,
)
async def start(self,
risotto_context: Context,
arguments: dict,
msg: str,
) -> None:
paths_msg = self._get_message_paths(risotto_context)
if get_config()['global']['debug']:
print(_(f'{risotto_context.username}: START:{paths_msg}: {msg}'))
await self.insert(msg,
self._get_last_uri(risotto_context),
paths_msg,
risotto_context,
'Start',
arguments,
start=True,
)
async def success(self,
risotto_context: Context,
returns: Optional[dict]=None,
) -> None:
if get_config()['global']['debug']:
paths_msg = self._get_message_paths(risotto_context)
print(_(f'{risotto_context.username}: SUCCESS:{paths_msg}({risotto_context.context_id})'))
sql = """UPDATE RisottoLog
SET StopDate = $2,
Level = 'SUCCESS'
"""
args = [datetime.now()]
if returns:
sql += """, Returns = $3
"""
args.append(dumps(returns))
sql += """WHERE LogId = $1
"""
await risotto_context.connection.execute(sql,
risotto_context.start_id,
*args,
)
async def failed(self,
risotto_context: Context,
err: str,
) -> None:
if get_config()['global']['debug']:
paths_msg = self._get_message_paths(risotto_context)
print(_(f'{risotto_context.username}: FAILED:{paths_msg}({risotto_context.context_id}): err'))
sql = """UPDATE RisottoLog
SET StopDate = $2,
Level = 'FAILED',
Msg = $3
WHERE LogId = $1
"""
await risotto_context.connection.execute(sql,
risotto_context.start_id,
datetime.now(),
err,
)
async def info(self, async def info(self,
risotto_context, risotto_context,
msg): msg,
):
if get_config()['global']['debug']: if get_config()['global']['debug']:
print(msg) print(msg)
await self.insert(msg, await self.insert(msg,
'',
None, None,
risotto_context, risotto_context,
'Info') 'Info',
)
log = Logger() log = Logger()

View File

@ -84,11 +84,11 @@ def register(uris: str,
def decorator(function): def decorator(function):
for uri in uris: for uri in uris:
version, message = uri.split('.', 1) dispatcher.set_function(uri,
dispatcher.set_function(version,
message,
notification, notification,
function) function,
function.__module__
)
return decorator return decorator
@ -185,21 +185,20 @@ class RegisterDispatcher:
raise RegistrationError(_(f'error with {module_name}.{function_name} arguments: {msg}')) raise RegistrationError(_(f'error with {module_name}.{function_name} arguments: {msg}'))
def set_function(self, def set_function(self,
version: str, uri: str,
message: str,
notification: str, notification: str,
function: Callable, function: Callable,
full_module_name: str,
): ):
""" register a function to an URI """ register a function to an URI
URI is a message URI is a message
""" """
version, message = uri.split('.', 1)
# check if message exists # check if message exists
if message not in self.messages[version]: if message not in self.messages[version]:
raise RegistrationError(_(f'the message {message} not exists')) raise RegistrationError(_(f'the message {message} not exists'))
# xxx submodule can only be register with v1.yyy.xxx..... message # xxx submodule can only be register with v1.yyy.xxx..... message
full_module_name = function.__module__
risotto_module_name, submodule_name = full_module_name.split('.')[-3:-1] risotto_module_name, submodule_name = full_module_name.split('.')[-3:-1]
module_name = risotto_module_name.split('_')[-1] module_name = risotto_module_name.split('_')[-1]
message_module, message_submodule, message_name = message.split('.', 2) message_module, message_submodule, message_name = message.split('.', 2)
@ -215,7 +214,7 @@ class RegisterDispatcher:
# check if already register # check if already register
if 'function' in self.messages[version][message]: if 'function' in self.messages[version][message]:
raise RegistrationError(_(f'uri {version}.{message} already registered')) raise RegistrationError(_(f'uri {uri} already registered'))
# register # register
if self.messages[version][message]['pattern'] == 'rpc': if self.messages[version][message]['pattern'] == 'rpc':
@ -288,9 +287,9 @@ class RegisterDispatcher:
for message, message_obj in messages.items(): for message, message_obj in messages.items():
if not 'functions' in message_obj and not 'function' in message_obj: if not 'functions' in message_obj and not 'function' in message_obj:
if message_obj['pattern'] == 'event': if message_obj['pattern'] == 'event':
print(f'{message} prêche dans le désert') print(f'{version}.{message} prêche dans le désert')
else: else:
missing_messages.append(message) missing_messages.append(f'{version}.{message}')
if missing_messages: if missing_messages:
raise RegistrationError(_(f'no matching function for uri {missing_messages}')) raise RegistrationError(_(f'no matching function for uri {missing_messages}'))