insert log in database

This commit is contained in:
Emmanuel Garette 2019-12-28 12:29:11 +01:00
parent a6383f0c2c
commit 4020f97db0
6 changed files with 292 additions and 220 deletions

View File

@ -82,6 +82,17 @@ CREATE TABLE RoleURI (
PRIMARY KEY (RoleName, URIId)
);
-- Log table creation
CREATE TABLE log(
Msg VARCHAR(255) NOT NULL,
Level VARCHAR(10) NOT NULL,
Path VARCHAR(255) NOT NULL,
Username VARCHAR(100) NOT NULL,
Data JSON,
Date timestamp DEFAULT current_timestamp
);
"""
async def main():

View File

@ -1,7 +1,7 @@
from tiramisu import Config
from traceback import print_exc
from copy import copy
from typing import Dict, Callable
from typing import Dict, Callable, List, Optional
from json import dumps, loads
from .utils import _
@ -17,21 +17,22 @@ import asyncpg
class CallDispatcher:
async def valid_call_returns(self,
risotto_context: Context,
function,
returns: Dict,
kwargs: Dict):
response = self.messages[risotto_context.version][risotto_context.message]['response']
module_name = risotto_context.function.__module__.split('.')[-2]
function_name = risotto_context.function.__name__
module_name = function.__module__.split('.')[-2]
function_name = function.__name__
if response.impl_get_information('multi'):
if not isinstance(returns, list):
err = _(f'function {module_name}.{function_name} has to return a list')
log.error_msg(risotto_context, kwargs, err)
await log.error_msg(risotto_context, kwargs, err)
raise CallError(str(err))
else:
if not isinstance(returns, dict):
log.error_msg(risotto_context, kwargs, returns)
await log.error_msg(risotto_context, kwargs, returns)
err = _(f'function {module_name}.{function_name} has to return a dict')
log.error_msg(risotto_context, kwargs, err)
await log.error_msg(risotto_context, kwargs, err)
raise CallError(str(err))
returns = [returns]
if response is None:
@ -46,11 +47,11 @@ class CallDispatcher:
await config.option(key).value.set(value)
except AttributeError:
err = _(f'function {module_name}.{function_name} return the unknown parameter "{key}"')
log.error_msg(risotto_context, kwargs, err)
await log.error_msg(risotto_context, kwargs, err)
raise CallError(str(err))
except ValueError:
err = _(f'function {module_name}.{function_name} return the parameter "{key}" with an unvalid value "{value}"')
log.error_msg(risotto_context, kwargs, err)
await log.error_msg(risotto_context, kwargs, err)
raise CallError(str(err))
await config.property.read_only()
mandatories = await config.value.mandatory()
@ -61,7 +62,7 @@ class CallDispatcher:
await config.value.dict()
except Exception as err:
err = _(f'function {module_name}.{function_name} return an invalid response {err}')
log.error_msg(risotto_context, kwargs, err)
await log.error_msg(risotto_context, kwargs, err)
raise CallError(str(err))
async def call(self,
@ -77,63 +78,32 @@ class CallDispatcher:
version,
message,
'rpc')
self.check_message_type(risotto_context,
kwargs)
try:
kw = await self.load_kwargs_to_config(risotto_context,
f'{version}.{message}',
kwargs,
check_role)
function_obj = self.messages[version][message]
risotto_context.function = function_obj['function']
if function_obj['risotto_context']:
kw['risotto_context'] = risotto_context
# do not start a new database connection
if function_obj['database'] and hasattr(old_risotto_context, 'connection'):
risotto_context.connection = old_risotto_context.connection
if function_obj['database'] and not hasattr(risotto_context, 'connection'):
async with self.pool.acquire() as connection:
await connection.set_type_codec(
'json',
encoder=dumps,
decoder=loads,
schema='pg_catalog'
)
risotto_context.connection = connection
async with connection.transaction():
returns = await risotto_context.function(self.injected_self[function_obj['module']], **kw)
else:
returns = await risotto_context.function(self.injected_self[function_obj['module']], **kw)
except CallError as err:
raise err
except Exception as err:
if get_config().get('global').get('debug'):
print_exc()
log.error_msg(risotto_context,
kwargs,
err)
raise CallError(str(err))
# valid returns
await self.valid_call_returns(risotto_context,
returns,
kwargs)
# log the success
log.info_msg(risotto_context,
kwargs,
_(f'returns {returns}'))
# notification
if function_obj.get('notification'):
notif_version, notif_message = function_obj['notification'].split('.', 1)
if not isinstance(returns, list):
send_returns = [returns]
else:
send_returns = returns
for ret in send_returns:
await self.publish(notif_version,
notif_message,
risotto_context,
**ret)
return returns
function_objs = [self.messages[version][message]]
# do not start a new database connection
if hasattr(old_risotto_context, 'connection'):
risotto_context.connection = old_risotto_context.connection
return await self.launch(version,
message,
risotto_context,
check_role,
kwargs,
function_objs)
else:
async with self.pool.acquire() as connection:
await connection.set_type_codec(
'json',
encoder=dumps,
decoder=loads,
schema='pg_catalog'
)
risotto_context.connection = connection
async with connection.transaction():
return await self.launch(version,
message,
risotto_context,
check_role,
kwargs,
function_objs)
class PublishDispatcher:
@ -147,66 +117,32 @@ class PublishDispatcher:
version,
message,
'event')
self.check_message_type(risotto_context,
kwargs)
try:
config_arguments = await self.load_kwargs_to_config(risotto_context,
f'{version}.{message}',
kwargs,
check_role)
except CallError as err:
return
except Exception as err:
# if there is a problem with arguments, just send an error et do nothing
if DEBUG:
print_exc()
log.error_msg(risotto_context, kwargs, err)
return
# config is ok, so publish the message
for function_obj in self.messages[version][message].get('functions', []):
function = function_obj['function']
module_name = function.__module__.split('.')[-2]
function_name = function.__name__
info_msg = _(f'in module {module_name}.{function_name}')
try:
# build argument for this function
kw = {}
for key, value in config_arguments.items():
if key in function_obj['arguments']:
kw[key] = value
if function_obj['risotto_context']:
kw['risotto_context'] = risotto_context
# send event
if function_obj['database'] and hasattr(old_risotto_context, 'connection'):
risotto_context.connection = old_risotto_context.connection
if function_obj['database'] and not hasattr(risotto_context, 'connection'):
async with self.pool.acquire() as connection:
await connection.set_type_codec(
'json',
encoder=dumps,
decoder=loads,
schema='pg_catalog'
)
risotto_context.connection = connection
async with connection.transaction():
returns = await function(self.injected_self[function_obj['module']], **kw)
else:
returns = await function(self.injected_self[function_obj['module']], **kw)
except Exception as err:
if DEBUG:
print_exc()
log.error_msg(risotto_context, kwargs, err, info_msg)
continue
else:
log.info_msg(risotto_context, kwargs, info_msg)
# notification
if function_obj.get('notification'):
notif_version, notif_message = function_obj['notification'].split('.', 1)
await self.publish(notif_version,
notif_message,
risotto_context,
**returns)
function_objs = self.messages[version][message].get('functions', [])
# do not start a new database connection
if hasattr(old_risotto_context, 'connection'):
risotto_context.connection = old_risotto_context.connection
return await self.launch(version,
message,
risotto_context,
check_role,
kwargs,
function_objs)
else:
async with self.pool.acquire() as connection:
await connection.set_type_codec(
'json',
encoder=dumps,
decoder=loads,
schema='pg_catalog'
)
risotto_context.connection = connection
async with connection.transaction():
return await self.launch(version,
message,
risotto_context,
check_role,
kwargs,
function_objs)
class Dispatcher(register.RegisterDispatcher, CallDispatcher, PublishDispatcher):
@ -231,12 +167,12 @@ class Dispatcher(register.RegisterDispatcher, CallDispatcher, PublishDispatcher)
risotto_context.version = version
return risotto_context
def check_message_type(self,
risotto_context: Context,
kwargs: Dict):
async def check_message_type(self,
risotto_context: Context,
kwargs: Dict):
if self.messages[risotto_context.version][risotto_context.message]['pattern'] != risotto_context.type:
msg = _(f'{risotto_context.uri} is not a {risotto_context.type} message')
log.error_msg(risotto_context, kwargs, msg)
await log.error_msg(risotto_context, kwargs, msg)
raise CallError(msg)
async def load_kwargs_to_config(self,
@ -324,6 +260,87 @@ class Dispatcher(register.RegisterDispatcher, CallDispatcher, PublishDispatcher)
return
raise NotAllowedError(_(f'You ({user_login}) don\'t have any authorisation to access to "{uri}"'))
async def launch(self,
version: str,
message: str,
risotto_context: Context,
check_role: bool,
kwargs: Dict,
function_objs: List) -> Optional[Dict]:
await self.check_message_type(risotto_context,
kwargs)
try:
config_arguments = await self.load_kwargs_to_config(risotto_context,
f'{version}.{message}',
kwargs,
check_role)
except Exception as err:
# if there is a problem with arguments, just send an error et do nothing
if DEBUG:
print_exc()
await log.error_msg(risotto_context, kwargs, err)
return
# config is ok, so send the message
for function_obj in function_objs:
function = function_obj['function']
module_name = function.__module__.split('.')[-2]
function_name = function.__name__
info_msg = _(f'in module {module_name}.{function_name}')
try:
# build argument for this function
if risotto_context.type == 'rpc':
kw = config_arguments
else:
kw = {}
for key, value in config_arguments.items():
if key in function_obj['arguments']:
kw[key] = value
if function_obj['risotto_context']:
kw['risotto_context'] = risotto_context
returns = await function(self.injected_self[function_obj['module']], **kw)
except CallError as err:
if risotto_context.type == 'rpc':
raise err
continue
except Exception as err:
if get_config().get('global').get('debug'):
print_exc()
await log.error_msg(risotto_context,
kwargs,
err)
if risotto_context.type == 'rpc':
raise CallError(str(err))
continue
else:
if risotto_context.type == 'rpc':
# valid returns
await self.valid_call_returns(risotto_context,
function,
returns,
kwargs)
# log the success
await log.info_msg(risotto_context,
{'arguments': kwargs,
'returns': returns},
info_msg)
# notification
if function_obj.get('notification'):
notif_version, notif_message = function_obj['notification'].split('.', 1)
if not isinstance(returns, list):
send_returns = [returns]
else:
send_returns = returns
for ret in send_returns:
await self.publish(notif_version,
notif_message,
risotto_context,
**ret)
if risotto_context.type == 'rpc':
return returns
dispatcher = Dispatcher()
register.dispatcher = dispatcher

View File

@ -55,8 +55,8 @@ class extra_route_handler:
if DEBUG:
print_exc()
raise HTTPInternalServerError(reason=str(err))
log.info_msg(kwargs['risotto_context'],
dict(request.match_info))
await log.info_msg(kwargs['risotto_context'],
dict(request.match_info))
return Response(text=dumps(returns))

View File

@ -1,4 +1,5 @@
from typing import Dict
from typing import Dict, Any
from json import dumps
from .context import Context
from .utils import _
from .config import DEBUG
@ -8,6 +9,28 @@ class Logger:
""" An object to manager log
FIXME should add event to a database
"""
async def insert(self,
msg: str,
path: str,
risotto_context: str,
level: str,
data: Any= None) -> None:
insert = 'INSERT INTO log(Msg, Path, Username, Level'
values = 'VALUES($1,$2,$3,$4'
args = [msg, path, risotto_context.username, level]
if data:
insert += ', Data'
values += ',$5'
args.append(dumps(data))
sql = insert + ') ' + values + ')'
print(sql, args)
if not hasattr(risotto_context, 'connection'):
print('MANQUE CONNEXION !!!')
print(sql)
else:
await risotto_context.connection.fetch(sql, *args)
def _get_message_paths(self,
risotto_context: Context):
paths = risotto_context.paths
@ -23,40 +46,55 @@ class Logger:
paths_msg += ':'
return paths_msg
def error_msg(self,
risotto_context: Context,
arguments,
error: str,
msg: str=''):
async def error_msg(self,
risotto_context: Context,
arguments,
error: str,
msg: str=''):
""" send message when an error append
"""
paths_msg = self._get_message_paths(risotto_context)
# if DEBUG:
print(_(f'{risotto_context.username}: ERROR: {error} ({paths_msg} with arguments "{arguments}": {msg})'))
await self.insert(msg,
paths_msg,
risotto_context,
'Error',
arguments)
def info_msg(self,
risotto_context: Context,
arguments: Dict,
msg: str=''):
async def info_msg(self,
risotto_context: Context,
arguments: Dict,
msg: str=''):
""" send message with common information
"""
if risotto_context.paths:
paths_msg = self._get_message_paths(risotto_context)
else:
paths_msg = ''
tmsg = _(f'{risotto_context.username}: INFO:{paths_msg}')
if arguments:
tmsg += _(f' with arguments "{arguments}"')
if msg:
tmsg += f' {msg}'
if DEBUG:
print(tmsg)
tmsg = _(f'{risotto_context.username}: INFO:{paths_msg}')
if arguments:
tmsg += _(f' with arguments "{arguments}"')
if msg:
tmsg += f' {msg}'
def info(self,
msg):
print(tmsg)
await self.insert(msg,
paths_msg,
risotto_context,
'Info',
arguments)
async def info(self,
risotto_context,
msg):
if DEBUG:
print(msg)
await self.insert(msg,
paths_msg,
risotto_context,
'Info')
log = Logger()

View File

@ -10,7 +10,7 @@ from rougail import load as rougail_load
from ...controller import Controller
from ...register import register
from ...config import DATABASE_DIR, DEBUG, ROUGAIL_DTD_PATH, get_config
from ...config import DATABASE_DIR, ROUGAIL_DTD_PATH, get_config
from ...context import Context
from ...utils import _
from ...error import CallError, RegistrationError
@ -43,9 +43,9 @@ class Risotto(Controller):
risotto_context: Context) -> None:
""" load all available servermodels
"""
log.info_msg(risotto_context,
None,
'Load servermodels')
await log.info_msg(risotto_context,
None,
'Load servermodels')
servermodels = await self.call('v1.servermodel.list',
risotto_context)
@ -79,9 +79,9 @@ class Risotto(Controller):
"""
cache_file = join(self.cache_root_path, str(servermodel_id), "dictionaries.xml")
funcs_file = self.get_funcs_filename(servermodel_id)
log.info_msg(risotto_context,
None,
f'Load servermodel {servermodel_name} ({servermodel_id})')
await log.info_msg(risotto_context,
None,
f'Load servermodel {servermodel_name} ({servermodel_id})')
# use file in cache
with open(cache_file) as fileio:
@ -148,37 +148,34 @@ class Risotto(Controller):
if servermodel_parent_id is None:
return
if not self.servermodel.get(servermodel_parent_id):
if DEBUG:
msg = _(f'Servermodel with id {servermodel_parent_id} not loaded, skipping legacy for servermodel {servermodel_name} ({servermodel_id})')
log.error_msg(risotto_context,
None,
msg)
msg = _(f'Servermodel with id {servermodel_parent_id} not loaded, skipping legacy for servermodel {servermodel_name} ({servermodel_id})')
await log.error_msg(risotto_context,
None,
msg)
return
servermodel_parent = self.servermodel[servermodel_parent_id]
servermodel_parent_name = await servermodel_parent.information.get('servermodel_name')
if DEBUG:
msg = _(f'Create legacy of servermodel {servermodel_name} ({servermodel_id}) with parent {servermodel_parent_name} ({servermodel_parent_id})')
log.info_msg(risotto_context,
None,
msg)
msg = _(f'Create legacy of servermodel {servermodel_name} ({servermodel_id}) with parent {servermodel_parent_name} ({servermodel_parent_id})')
await log.info_msg(risotto_context,
None,
msg)
# do link
mix = await servermodel_parent.config.get('m_v_' + str(servermodel_parent_id))
try:
await mix.config.add(self.servermodel[servermodel_id])
except Exception as err:
if DEBUG:
log.error_msg(risotto_context,
None,
str(err))
await log.error_msg(risotto_context,
None,
str(err))
async def load_servers(self,
risotto_context: Context) -> None:
""" load all available servers
"""
log.info_msg(risotto_context,
None,
f'Load servers')
await log.info_msg(risotto_context,
None,
f'Load servers')
# get all servers
servers = await self.call('v1.server.list',
risotto_context)
@ -195,9 +192,9 @@ class Risotto(Controller):
server_name = server['server_name']
server_id = server['server_id']
msg = _(f'unable to load server {server_name} ({server_id}): {err}')
log.error_msg(risotto_context,
None,
msg)
await log.error_msg(risotto_context,
None,
msg)
async def load_server(self,
risotto_context: Context,
@ -208,14 +205,14 @@ class Risotto(Controller):
"""
if server_id in self.server:
return
log.info_msg(risotto_context,
None,
f'Load server {server_name} ({server_id})')
await log.info_msg(risotto_context,
None,
f'Load server {server_name} ({server_id})')
if not server_servermodel_id in self.servermodel:
msg = f'unable to find servermodel with id {server_servermodel_id}'
log.error_msg(risotto_context,
None,
msg)
await log.error_msg(risotto_context,
None,
msg)
raise CallError(msg)
# check if server was already created
@ -370,9 +367,9 @@ class Risotto(Controller):
deployed: bool) -> bytes:
if server_id not in self.server:
msg = _(f'cannot find server with id {server_id}')
log.error_msg(risotto_context,
None,
msg)
await log.error_msg(risotto_context,
None,
msg)
raise CallError(msg)
if deployed:
@ -388,9 +385,9 @@ class Risotto(Controller):
msg = _(f'No configuration available for server {server_id}')
else:
msg = _(f'No undeployed configuration available for server {server_id}')
log.error_msg(risotto_context,
None,
msg)
await log.error_msg(risotto_context,
None,
msg)
raise CallError(msg)
return {'server_id': server_id,
'deployed': deployed,

View File

@ -36,11 +36,12 @@ class Risotto(Controller):
release_distribution='stable')
self.internal_release_id = internal_release['release_id']
def servermodel_gen_funcs(self,
servermodel_name: str,
servermodel_id: int,
dependencies: Dict,
release_cache: Dict) -> None:
async def servermodel_gen_funcs(self,
servermodel_name: str,
servermodel_id: int,
dependencies: Dict,
release_cache: Dict,
risotto_context: Context) -> None:
as_names = []
dest_file = self.get_servermodel_cache(servermodel_id, 'funcs.py')
with open(dest_file, 'wb') as funcs:
@ -65,14 +66,16 @@ class Risotto(Controller):
funcs.write(b'\n')
as_names_str = '", "'.join(as_names)
log.info(_(f'gen funcs for "{servermodel_name}" with application services "{as_names_str}"'))
await log.info(risotto_context,
_(f'gen funcs for "{servermodel_name}" with application services "{as_names_str}"'))
eolobj = CreoleObjSpace(dtdfilename)
def servermodel_gen_schema(self,
servermodel_name: str,
servermodel_id: int,
dependencies: Dict,
release_cache: Dict) -> None:
async def servermodel_gen_schema(self,
servermodel_name: str,
servermodel_id: int,
dependencies: Dict,
release_cache: Dict,
risotto_context: Context) -> None:
paths = []
extras = []
as_names = set()
@ -105,7 +108,8 @@ class Risotto(Controller):
extras.append((namespace, [extra_dir]))
eolobj = CreoleObjSpace(dtdfilename)
as_names_str = '", "'.join(as_names)
log.info(_(f'gen schema for "{servermodel_name}" with application services "{as_names_str}"'))
await log.info(risotto_context,
_(f'gen schema for "{servermodel_name}" with application services "{as_names_str}"'))
eolobj.create_or_populate_from_xml('creole', paths)
for extra in extras:
eolobj.create_or_populate_from_xml(extra[0], extra[1])
@ -122,11 +126,12 @@ class Risotto(Controller):
return join(self.cache_root_path, str(servermodel_id), subdir)
return join(self.cache_root_path, str(servermodel_id))
def servermodel_copy_templates(self,
servermodel_name: str,
servermodel_id: int,
dependencies: Dict,
release_cache: Dict) -> None:
async def servermodel_copy_templates(self,
servermodel_name: str,
servermodel_id: int,
dependencies: Dict,
release_cache: Dict,
risotto_context: Context) -> None:
as_names = []
dest_dir = self.get_servermodel_cache(servermodel_id, 'templates')
makedirs(dest_dir)
@ -147,7 +152,8 @@ class Risotto(Controller):
copyfile(join(path, template), template_path)
as_names.append(applicationservice_name)
as_names_str = '", "'.join(as_names)
log.info(_(f'copy templates for "{servermodel_name}" with application services "{as_names_str}"'))
await log.info(risotto_context,
_(f'copy templates for "{servermodel_name}" with application services "{as_names_str}"'))
async def _servermodel_create(self,
risotto_context: Context,
@ -193,18 +199,21 @@ class Risotto(Controller):
risotto_context,
release_id=as_release_id)
self.servermodel_gen_funcs(servermodel_name,
servermodel_id,
dependencies,
release_cache)
self.servermodel_gen_schema(servermodel_name,
servermodel_id,
dependencies,
release_cache)
self.servermodel_copy_templates(servermodel_name,
servermodel_id,
dependencies,
release_cache)
await self.servermodel_gen_funcs(servermodel_name,
servermodel_id,
dependencies,
release_cache,
risotto_context)
await self.servermodel_gen_schema(servermodel_name,
servermodel_id,
dependencies,
release_cache,
risotto_context)
await self.servermodel_copy_templates(servermodel_name,
servermodel_id,
dependencies,
release_cache,
risotto_context)
sm_dict = {'servermodel_name': servermodel_name,
'servermodel_description': servermodel_description,
'servermodel_parents_id': servermodel_parents_id,