Compare commits

...

12 Commits

7 changed files with 192 additions and 126 deletions

View File

@ -1,12 +1,12 @@
CREATE TABLE RisottoLog( CREATE TABLE RisottoLog(
LogId SERIAL PRIMARY KEY, LogId SERIAL PRIMARY KEY,
ContextId INTEGER,
Msg VARCHAR(255) NOT NULL, Msg VARCHAR(255) NOT NULL,
URI VARCHAR(255), URI VARCHAR(255),
URIS VARCHAR(255), URIS VARCHAR(255),
UserLogin VARCHAR(100) NOT NULL, UserLogin VARCHAR(100) NOT NULL,
Level VARCHAR(10) NOT NULL, Level VARCHAR(10) NOT NULL,
ContextId INTEGER, Kwargs JSON,
Data JSON,
Returns JSON, Returns JSON,
StartDate timestamp DEFAULT current_timestamp, StartDate timestamp DEFAULT current_timestamp,
StopDate timestamp StopDate timestamp

View File

@ -115,6 +115,11 @@ if 'PASSWORD_URL' in environ:
PASSWORD_URL = environ['PASSWORD_URL'] PASSWORD_URL = environ['PASSWORD_URL']
else: else:
PASSWORD_URL = config.get('PASSWORD_URL', 'https://localhost:8001/') PASSWORD_URL = config.get('PASSWORD_URL', 'https://localhost:8001/')
if 'PASSWORD_LENGTH' in environ:
PASSWORD_LENGTH = environ['PASSWORD_LENGTH']
else:
PASSWORD_LENGTH = config.get('PASSWORD_LENGTH', 20)
if 'PKI_ADMIN_PASSWORD' in environ: if 'PKI_ADMIN_PASSWORD' in environ:
PKI_ADMIN_PASSWORD = environ['PKI_ADMIN_PASSWORD'] PKI_ADMIN_PASSWORD = environ['PKI_ADMIN_PASSWORD']
else: else:
@ -155,6 +160,7 @@ _config = {'database': {'dsn': dsn_factory(RISOTTO_DB_NAME, RISOTTO_DB_USER, RIS
'admin_password': PASSWORD_ADMIN_PASSWORD, 'admin_password': PASSWORD_ADMIN_PASSWORD,
'device_identifier': PASSWORD_DEVICE_IDENTIFIER, 'device_identifier': PASSWORD_DEVICE_IDENTIFIER,
'service_url': PASSWORD_URL, 'service_url': PASSWORD_URL,
'length': PASSWORD_LENGTH,
}, },
'pki': {'admin_password': PKI_ADMIN_PASSWORD, 'pki': {'admin_password': PKI_ADMIN_PASSWORD,
'owner': PKI_ADMIN_EMAIL, 'owner': PKI_ADMIN_EMAIL,

View File

@ -3,3 +3,11 @@ class Context:
self.paths = [] self.paths = []
self.context_id = None self.context_id = None
self.start_id = None self.start_id = None
def copy(self):
context = Context()
for key, value in self.__dict__.items():
if key.startswith('__'):
continue
setattr(context, key, value)
return context

View File

@ -48,8 +48,8 @@ class CallDispatcher:
except AttributeError: except AttributeError:
err = _(f'function {module_name}.{function_name} return the unknown parameter "{key}" for the uri "{risotto_context.version}.{risotto_context.message}"') err = _(f'function {module_name}.{function_name} return the unknown parameter "{key}" for the uri "{risotto_context.version}.{risotto_context.message}"')
raise CallError(err) raise CallError(err)
except ValueError: except ValueError as err:
err = _(f'function {module_name}.{function_name} return the parameter "{key}" with an unvalid value "{value}" for the uri "{risotto_context.version}.{risotto_context.message}"') err = _(f'function {module_name}.{function_name} return the invalid parameter "{key}" for the uri "{risotto_context.version}.{risotto_context.message}": {err}')
raise CallError(err) raise CallError(err)
await config.property.read_only() await config.property.read_only()
mandatories = await config.value.mandatory() mandatories = await config.value.mandatory()
@ -108,12 +108,16 @@ class CallDispatcher:
config_arguments, config_arguments,
function_obj, function_obj,
) )
except CallError as err: await log.success(risotto_context,
ret,
)
except Exception as err:
await log.failed(risotto_context, await log.failed(risotto_context,
str(err), str(err),
) )
raise err from err raise CallError(err) from err
else: else:
error = None
try: try:
async with self.pool.acquire() as connection: async with self.pool.acquire() as connection:
await connection.set_type_codec( await connection.set_type_codec(
@ -157,24 +161,22 @@ class CallDispatcher:
) )
raise err from err raise err from err
except CallError as err: except CallError as err:
raise err from err error = err
except Exception as err: except Exception as err:
# if there is a problem with arguments, just send an error and do nothing # if there is a problem with arguments, just send an error and do nothing
if get_config()['global']['debug']: if get_config()['global']['debug']:
print_exc() print_exc()
async with self.pool.acquire() as connection: await log.failed(risotto_context,
await connection.set_type_codec( str(err),
'json', )
encoder=dumps, error = err
decoder=loads, if error:
schema='pg_catalog' if not internal:
) err = CallError(str(error))
risotto_context.connection = connection err.context_id = risotto_context.context_id
async with connection.transaction(): else:
await log.failed(risotto_context, err = error
str(err), raise err from error
)
raise err from err
return ret return ret
@ -187,8 +189,6 @@ class PublishDispatcher:
for message, message_infos in messages.items(): for message, message_infos in messages.items():
# event not emit locally # event not emit locally
if message_infos['pattern'] == 'event' and 'functions' in message_infos and message_infos['functions']: if message_infos['pattern'] == 'event' and 'functions' in message_infos and message_infos['functions']:
# module, submodule, submessage = message.split('.', 2)
# if f'{module}.{submodule}' not in self.injected_self:
uri = f'{version}.{message}' uri = f'{version}.{message}'
print(f' - {uri}') print(f' - {uri}')
await self.listened_connection.add_listener(uri, await self.listened_connection.add_listener(uri,
@ -224,21 +224,34 @@ class PublishDispatcher:
version, message = uri.split('.', 1) version, message = uri.split('.', 1)
loop = get_event_loop() loop = get_event_loop()
remote_kw = loads(payload) remote_kw = loads(payload)
risotto_context = self.build_new_context(remote_kw['context'], for function_obj in self.messages[version][message]['functions']:
version, risotto_context = self.build_new_context(remote_kw['context'],
message, version,
'event', message,
) 'event',
callback = lambda: ensure_future(self._publish(version, )
message, callback = self.get_callback(version, message, function_obj, risotto_context, remote_kw['kwargs'],)
risotto_context, loop.call_soon(callback)
**remote_kw['kwargs'],
)) def get_callback(self,
loop.call_soon(callback) version,
message,
function_obj,
risotto_context,
kwargs,
):
return lambda: ensure_future(self._publish(version,
message,
function_obj,
risotto_context,
**kwargs,
))
async def _publish(self, async def _publish(self,
version: str, version: str,
message: str, message: str,
function_obj,
risotto_context: Context, risotto_context: Context,
**kwargs, **kwargs,
) -> None: ) -> None:
@ -256,50 +269,40 @@ class PublishDispatcher:
schema='pg_catalog' schema='pg_catalog'
) )
risotto_context.connection = connection risotto_context.connection = connection
for function_obj in self.messages[version][message]['functions']: function_name = function_obj['function'].__name__
function_name = function_obj['function'].__name__ info_msg = _(f"call function {function_obj['full_module_name']}.{function_name}")
info_msg = _(f"call function {function_obj['full_module_name']}.{function_name}") try:
try: async with connection.transaction():
async with connection.transaction(): try:
try: await log.start(risotto_context,
await log.start(risotto_context, kwargs,
kwargs, info_msg,
info_msg, )
) await self.check_message_type(risotto_context,
await self.check_message_type(risotto_context, kwargs,
kwargs, )
) await self.launch(risotto_context,
await self.launch(risotto_context, kwargs,
kwargs, config_arguments,
config_arguments, function_obj,
function_obj, )
) # log the success
# log the success await log.success(risotto_context)
await log.success(risotto_context) except CallError as err:
except CallError as err: if get_config()['global']['debug']:
if get_config()['global']['debug']: print_exc()
print_exc() await log.failed(risotto_context,
await log.failed(risotto_context, str(err),
str(err), )
) except CallError:
except CallError: pass
pass except Exception as err:
except Exception as err: # if there is a problem with arguments, log and do nothing
# if there is a problem with arguments, log and do nothing if get_config()['global']['debug']:
if get_config()['global']['debug']: print_exc()
print_exc() await log.failed(risotto_context,
async with self.pool.acquire() as connection: str(err),
await connection.set_type_codec( )
'json',
encoder=dumps,
decoder=loads,
schema='pg_catalog'
)
risotto_context.connection = connection
async with connection.transaction():
await log.failed(risotto_context,
str(err),
)
class Dispatcher(register.RegisterDispatcher, class Dispatcher(register.RegisterDispatcher,
@ -327,6 +330,7 @@ class Dispatcher(register.RegisterDispatcher,
risotto_context.type = type risotto_context.type = type
risotto_context.message = message risotto_context.message = message
risotto_context.version = version risotto_context.version = version
risotto_context.pool = self.pool
return risotto_context return risotto_context
async def check_message_type(self, async def check_message_type(self,

View File

@ -101,15 +101,31 @@ async def handle(request):
internal=False, internal=False,
**kwargs, **kwargs,
) )
except NotAllowedError as err:
raise HTTPNotFound(reason=str(err))
except CallError as err:
raise HTTPBadRequest(reason=str(err).replace('\n', ' '))
except Exception as err: except Exception as err:
if get_config()['global']['debug']: context_id = None
print_exc() if isinstance(err, NotAllowedError):
raise HTTPInternalServerError(reason=str(err)) error_type = HTTPNotFound
return Response(text=dumps({'response': text}), elif isinstance(err, CallError):
error_type = HTTPBadRequest
context_id = err.context_id
else:
if get_config()['global']['debug']:
print_exc()
error_type = HTTPInternalServerError
response = {'type': 'error',
'reason': str(err).replace('\n', ' '),
}
if context_id is not None:
response['context_id'] = context_id
err = dumps({'response': response,
'type': 'error',
})
raise error_type(text=err,
content_type='application/json',
)
return Response(text=dumps({'response': text,
'type': 'success',
}),
content_type='application/json', content_type='application/json',
) )

View File

@ -2,35 +2,54 @@ from typing import Dict, Any, Optional
from json import dumps, loads from json import dumps, loads
from asyncpg.exceptions import UndefinedTableError from asyncpg.exceptions import UndefinedTableError
from datetime import datetime from datetime import datetime
from asyncio import Lock
from .context import Context from .context import Context
from .utils import _ from .utils import _
from .config import get_config from .config import get_config
database_lock = Lock()
class Logger: class Logger:
""" An object to manager log """ An object to manager log
""" """
def __init__(self) -> None:
self.log_connection = None
async def get_connection(self,
risotto_context: Context,
):
if not self.log_connection:
self.log_connection = await risotto_context.pool.acquire()
await self.log_connection.set_type_codec(
'json',
encoder=dumps,
decoder=loads,
schema='pg_catalog'
)
return self.log_connection
async def insert(self, async def insert(self,
msg: str, msg: str,
uri: str,
uris: str,
risotto_context: Context, risotto_context: Context,
level: str, level: str,
data: Any=None, kwargs: Any=None,
start: bool=False, start: bool=False,
) -> None: ) -> None:
uri = self._get_last_uri(risotto_context)
uris = " ".join(risotto_context.paths)
insert = 'INSERT INTO RisottoLog(Msg, URI, URIS, UserLogin, Level' insert = 'INSERT INTO RisottoLog(Msg, URI, URIS, UserLogin, Level'
values = 'VALUES($1,$2,$3,$4,$5' values = 'VALUES($1,$2,$3,$4,$5'
args = [msg, uri, uris, risotto_context.username, level] args = [msg, uri, uris, risotto_context.username, level]
if data: if kwargs:
insert += ', Data' insert += ', Kwargs'
values += ',$6' values += ',$6'
args.append(dumps(data)) args.append(dumps(kwargs))
context_id = risotto_context.context_id context_id = risotto_context.context_id
if context_id is not None: if context_id is not None:
insert += ', ContextId' insert += ', ContextId'
if data: if kwargs:
values += ',$7' values += ',$7'
else: else:
values += ',$6' values += ',$6'
@ -38,7 +57,9 @@ class Logger:
sql = insert + ') ' + values + ') RETURNING LogId' sql = insert + ') ' + values + ') RETURNING LogId'
try: try:
log_id = await risotto_context.connection.fetchval(sql, *args) async with database_lock:
connection = await self.get_connection(risotto_context)
log_id = await connection.fetchval(sql, *args)
if context_id is None and start: if context_id is None and start:
risotto_context.context_id = log_id risotto_context.context_id = log_id
if start: if start:
@ -51,7 +72,7 @@ class Logger:
context_id: int, context_id: int,
uri: Optional[str], uri: Optional[str],
) -> list: ) -> list:
sql = '''SELECT Msg as msg, URI as uri_name, URIS as uris, UserLogin as user_login, Level as level, Data as data, StartDate as start_date, StopDate as stop_date sql = '''SELECT Msg as msg, URI as uri_name, URIS as uris, UserLogin as user_login, Level as level, Kwargs as kwargs, Returns as returns, StartDate as start_date, StopDate as stop_date
FROM RisottoLog FROM RisottoLog
WHERE UserLogin = $1 AND (LogId = $2 OR ContextId = $2) WHERE UserLogin = $1 AND (LogId = $2 OR ContextId = $2)
''' '''
@ -60,18 +81,24 @@ class Logger:
sql += ' AND URI = $3' sql += ' AND URI = $3'
args.append(uri) args.append(uri)
ret = [] ret = []
for row in await risotto_context.connection.fetch(*args): async with database_lock:
d = {} connection = await self.get_connection(risotto_context)
for key, value in row.items(): for row in await connection.fetch(*args):
if key == 'data': d = {}
if not value: for key, value in row.items():
value = {} if key in ['kwargs', 'returns']:
else: if isinstance(value, dict):
value = loads(value) pass
elif key in ['start_date', 'stop_date']: elif not value:
value = str(value) value = {}
d[key] = value else:
ret.append(d) value = loads(value)
if key == 'uris':
value = value.split(' ')
elif key in ['start_date', 'stop_date']:
value = str(value)
d[key] = value
ret.append(d)
return ret return ret
def _get_last_uri(self, def _get_last_uri(self,
@ -109,8 +136,6 @@ class Logger:
paths_msg = self._get_message_paths(risotto_context) paths_msg = self._get_message_paths(risotto_context)
print(_(f'{risotto_context.username}: ERROR: {error} ({paths_msg} with arguments "{arguments}": {msg})')) print(_(f'{risotto_context.username}: ERROR: {error} ({paths_msg} with arguments "{arguments}": {msg})'))
await self.insert(msg, await self.insert(msg,
self._get_last_uri(risotto_context),
paths_msg,
risotto_context, risotto_context,
'Error', 'Error',
arguments, arguments,
@ -127,8 +152,6 @@ class Logger:
if get_config()['global']['debug']: if get_config()['global']['debug']:
print(_(f'{risotto_context.username}: INFO:{paths_msg}: {msg}')) print(_(f'{risotto_context.username}: INFO:{paths_msg}: {msg}'))
await self.insert(msg, await self.insert(msg,
self._get_last_uri(risotto_context),
paths_msg,
risotto_context, risotto_context,
'Info', 'Info',
arguments, arguments,
@ -141,10 +164,12 @@ class Logger:
) -> None: ) -> None:
paths_msg = self._get_message_paths(risotto_context) paths_msg = self._get_message_paths(risotto_context)
if get_config()['global']['debug']: if get_config()['global']['debug']:
print(_(f'{risotto_context.username}: START:{paths_msg}: {msg}')) if risotto_context.context_id != None:
context = f'({risotto_context.context_id})'
else:
context = ''
print(_(f'{risotto_context.username}: START{context}:{paths_msg}: {msg}'))
await self.insert(msg, await self.insert(msg,
self._get_last_uri(risotto_context),
paths_msg,
risotto_context, risotto_context,
'Start', 'Start',
arguments, arguments,
@ -157,7 +182,7 @@ class Logger:
) -> None: ) -> None:
if get_config()['global']['debug']: if get_config()['global']['debug']:
paths_msg = self._get_message_paths(risotto_context) paths_msg = self._get_message_paths(risotto_context)
print(_(f'{risotto_context.username}: SUCCESS:{paths_msg}({risotto_context.context_id})')) print(_(f'{risotto_context.username}: SUCCESS({risotto_context.context_id}):{paths_msg}'))
sql = """UPDATE RisottoLog sql = """UPDATE RisottoLog
SET StopDate = $2, SET StopDate = $2,
Level = 'SUCCESS' Level = 'SUCCESS'
@ -169,10 +194,12 @@ class Logger:
args.append(dumps(returns)) args.append(dumps(returns))
sql += """WHERE LogId = $1 sql += """WHERE LogId = $1
""" """
await risotto_context.connection.execute(sql, async with database_lock:
risotto_context.start_id, connection = await self.get_connection(risotto_context)
*args, await connection.execute(sql,
) risotto_context.start_id,
*args,
)
async def failed(self, async def failed(self,
risotto_context: Context, risotto_context: Context,
@ -180,18 +207,24 @@ class Logger:
) -> None: ) -> None:
if get_config()['global']['debug']: if get_config()['global']['debug']:
paths_msg = self._get_message_paths(risotto_context) paths_msg = self._get_message_paths(risotto_context)
print(_(f'{risotto_context.username}: FAILED:{paths_msg}({risotto_context.context_id}): err')) if risotto_context.context_id != None:
context = f'({risotto_context.context_id})'
else:
context = ''
print(_(f'{risotto_context.username}: FAILED({risotto_context.context_id}):{paths_msg}: {err}'))
sql = """UPDATE RisottoLog sql = """UPDATE RisottoLog
SET StopDate = $2, SET StopDate = $2,
Level = 'FAILED', Level = 'FAILED',
Msg = $3 Msg = $3
WHERE LogId = $1 WHERE LogId = $1
""" """
await risotto_context.connection.execute(sql, async with database_lock:
risotto_context.start_id, connection = await self.get_connection(risotto_context)
datetime.now(), await connection.execute(sql,
err, risotto_context.start_id,
) datetime.now(),
err,
)
async def info(self, async def info(self,
risotto_context, risotto_context,
@ -200,8 +233,6 @@ class Logger:
if get_config()['global']['debug']: if get_config()['global']['debug']:
print(msg) print(msg)
await self.insert(msg, await self.insert(msg,
'',
None,
risotto_context, risotto_context,
'Info', 'Info',
) )

View File

@ -313,6 +313,7 @@ class RegisterDispatcher:
risotto_context.username = internal_user risotto_context.username = internal_user
risotto_context.paths.append(f'internal.{submodule_name}.on_join') risotto_context.paths.append(f'internal.{submodule_name}.on_join')
risotto_context.type = None risotto_context.type = None
risotto_context.pool = self.pool
risotto_context.connection = connection risotto_context.connection = connection
risotto_context.module = submodule_name.split('.', 1)[0] risotto_context.module = submodule_name.split('.', 1)[0]
info_msg = _(f'in function risotto_{submodule_name}.on_join') info_msg = _(f'in function risotto_{submodule_name}.on_join')