Compare commits

..

8 Commits

5 changed files with 240 additions and 195 deletions

View File

@ -1,12 +1,12 @@
CREATE TABLE RisottoLog(
LogId SERIAL PRIMARY KEY,
ContextId INTEGER,
Msg VARCHAR(255) NOT NULL,
URI VARCHAR(255),
URIS VARCHAR(255),
UserLogin VARCHAR(100) NOT NULL,
Level VARCHAR(10) NOT NULL,
ContextId INTEGER,
Data JSON,
Kwargs JSON,
Returns JSON,
StartDate timestamp DEFAULT current_timestamp,
StopDate timestamp

View File

@ -48,8 +48,8 @@ class CallDispatcher:
except AttributeError:
err = _(f'function {module_name}.{function_name} return the unknown parameter "{key}" for the uri "{risotto_context.version}.{risotto_context.message}"')
raise CallError(err)
except ValueError:
err = _(f'function {module_name}.{function_name} return the parameter "{key}" with an unvalid value "{value}" for the uri "{risotto_context.version}.{risotto_context.message}"')
except ValueError as err:
err = _(f'function {module_name}.{function_name} return the invalid parameter "{key}" for the uri "{risotto_context.version}.{risotto_context.message}": {err}')
raise CallError(err)
await config.property.read_only()
mandatories = await config.value.mandatory()
@ -89,6 +89,7 @@ class CallDispatcher:
if hasattr(old_risotto_context, 'connection'):
# do not start a new database connection
risotto_context.connection = old_risotto_context.connection
risotto_context.log_connection = old_risotto_context.log_connection
await log.start(risotto_context,
kwargs,
info_msg,
@ -108,12 +109,24 @@ class CallDispatcher:
config_arguments,
function_obj,
)
except CallError as err:
await log.success(risotto_context,
ret,
)
except Exception as err:
await log.failed(risotto_context,
str(err),
)
raise err from err
raise CallError(err) from err
else:
error = None
async with self.pool.acquire() as log_connection:
await log_connection.set_type_codec(
'json',
encoder=dumps,
decoder=loads,
schema='pg_catalog'
)
risotto_context.log_connection = log_connection
try:
async with self.pool.acquire() as connection:
await connection.set_type_codec(
@ -157,24 +170,22 @@ class CallDispatcher:
)
raise err from err
except CallError as err:
raise err from err
error = err
except Exception as err:
# if there is a problem with arguments, just send an error and do nothing
if get_config()['global']['debug']:
print_exc()
async with self.pool.acquire() as connection:
await connection.set_type_codec(
'json',
encoder=dumps,
decoder=loads,
schema='pg_catalog'
)
risotto_context.connection = connection
async with connection.transaction():
await log.failed(risotto_context,
str(err),
)
raise err from err
error = err
if error:
if not internal:
err = CallError(str(error))
err.context_id = risotto_context.context_id
else:
err = error
raise err from error
return ret
@ -248,6 +259,15 @@ class PublishDispatcher:
False,
False,
)
for function_obj in self.messages[version][message]['functions']:
async with self.pool.acquire() as log_connection:
await log_connection.set_type_codec(
'json',
encoder=dumps,
decoder=loads,
schema='pg_catalog'
)
risotto_context.log_connection = log_connection
async with self.pool.acquire() as connection:
await connection.set_type_codec(
'json',
@ -256,7 +276,6 @@ class PublishDispatcher:
schema='pg_catalog'
)
risotto_context.connection = connection
for function_obj in self.messages[version][message]['functions']:
function_name = function_obj['function'].__name__
info_msg = _(f"call function {function_obj['full_module_name']}.{function_name}")
try:

View File

@ -101,15 +101,31 @@ async def handle(request):
internal=False,
**kwargs,
)
except NotAllowedError as err:
raise HTTPNotFound(reason=str(err))
except CallError as err:
raise HTTPBadRequest(reason=str(err).replace('\n', ' '))
except Exception as err:
context_id = None
if isinstance(err, NotAllowedError):
error_type = HTTPNotFound
elif isinstance(err, CallError):
error_type = HTTPBadRequest
context_id = err.context_id
else:
if get_config()['global']['debug']:
print_exc()
raise HTTPInternalServerError(reason=str(err))
return Response(text=dumps({'response': text}),
error_type = HTTPInternalServerError
response = {'type': 'error',
'reason': str(err).replace('\n', ' '),
}
if context_id is not None:
response['context_id'] = context_id
err = dumps({'response': response,
'type': 'error',
})
raise error_type(text=err,
content_type='application/json',
)
return Response(text=dumps({'response': text,
'type': 'success',
}),
content_type='application/json',
)

View File

@ -13,24 +13,24 @@ class Logger:
"""
async def insert(self,
msg: str,
uri: str,
uris: str,
risotto_context: Context,
level: str,
data: Any=None,
kwargs: Any=None,
start: bool=False,
) -> None:
uri = self._get_last_uri(risotto_context)
uris = " ".join(risotto_context.paths)
insert = 'INSERT INTO RisottoLog(Msg, URI, URIS, UserLogin, Level'
values = 'VALUES($1,$2,$3,$4,$5'
args = [msg, uri, uris, risotto_context.username, level]
if data:
insert += ', Data'
if kwargs:
insert += ', Kwargs'
values += ',$6'
args.append(dumps(data))
args.append(dumps(kwargs))
context_id = risotto_context.context_id
if context_id is not None:
insert += ', ContextId'
if data:
if kwargs:
values += ',$7'
else:
values += ',$6'
@ -38,7 +38,8 @@ class Logger:
sql = insert + ') ' + values + ') RETURNING LogId'
try:
log_id = await risotto_context.connection.fetchval(sql, *args)
async with risotto_context.log_connection.transaction():
log_id = await risotto_context.log_connection.fetchval(sql, *args)
if context_id is None and start:
risotto_context.context_id = log_id
if start:
@ -51,7 +52,7 @@ class Logger:
context_id: int,
uri: Optional[str],
) -> list:
sql = '''SELECT Msg as msg, URI as uri_name, URIS as uris, UserLogin as user_login, Level as level, Data as data, StartDate as start_date, StopDate as stop_date
sql = '''SELECT Msg as msg, URI as uri_name, URIS as uris, UserLogin as user_login, Level as level, Kwargs as kwargs, Returns as returns, StartDate as start_date, StopDate as stop_date
FROM RisottoLog
WHERE UserLogin = $1 AND (LogId = $2 OR ContextId = $2)
'''
@ -60,14 +61,19 @@ class Logger:
sql += ' AND URI = $3'
args.append(uri)
ret = []
for row in await risotto_context.connection.fetch(*args):
async with risotto_context.log_connection.transaction():
for row in await risotto_context.log_connection.fetch(*args):
d = {}
for key, value in row.items():
if key == 'data':
if not value:
if key in ['kwargs', 'returns']:
if isinstance(value, dict):
pass
elif not value:
value = {}
else:
value = loads(value)
if key == 'uris':
value = value.split(' ')
elif key in ['start_date', 'stop_date']:
value = str(value)
d[key] = value
@ -109,8 +115,6 @@ class Logger:
paths_msg = self._get_message_paths(risotto_context)
print(_(f'{risotto_context.username}: ERROR: {error} ({paths_msg} with arguments "{arguments}": {msg})'))
await self.insert(msg,
self._get_last_uri(risotto_context),
paths_msg,
risotto_context,
'Error',
arguments,
@ -127,8 +131,6 @@ class Logger:
if get_config()['global']['debug']:
print(_(f'{risotto_context.username}: INFO:{paths_msg}: {msg}'))
await self.insert(msg,
self._get_last_uri(risotto_context),
paths_msg,
risotto_context,
'Info',
arguments,
@ -141,10 +143,12 @@ class Logger:
) -> None:
paths_msg = self._get_message_paths(risotto_context)
if get_config()['global']['debug']:
print(_(f'{risotto_context.username}: START:{paths_msg}: {msg}'))
if risotto_context.context_id != None:
context = f'({risotto_context.context_id})'
else:
context = ''
print(_(f'{risotto_context.username}: START{context}:{paths_msg}: {msg}'))
await self.insert(msg,
self._get_last_uri(risotto_context),
paths_msg,
risotto_context,
'Start',
arguments,
@ -157,7 +161,7 @@ class Logger:
) -> None:
if get_config()['global']['debug']:
paths_msg = self._get_message_paths(risotto_context)
print(_(f'{risotto_context.username}: SUCCESS:{paths_msg}({risotto_context.context_id})'))
print(_(f'{risotto_context.username}: SUCCESS({risotto_context.context_id}):{paths_msg}'))
sql = """UPDATE RisottoLog
SET StopDate = $2,
Level = 'SUCCESS'
@ -169,7 +173,8 @@ class Logger:
args.append(dumps(returns))
sql += """WHERE LogId = $1
"""
await risotto_context.connection.execute(sql,
async with risotto_context.log_connection.transaction():
await risotto_context.log_connection.execute(sql,
risotto_context.start_id,
*args,
)
@ -180,14 +185,19 @@ class Logger:
) -> None:
if get_config()['global']['debug']:
paths_msg = self._get_message_paths(risotto_context)
print(_(f'{risotto_context.username}: FAILED:{paths_msg}({risotto_context.context_id}): err'))
if risotto_context.context_id != None:
context = f'({risotto_context.context_id})'
else:
context = ''
print(_(f'{risotto_context.username}: FAILED({risotto_context.context_id}):{paths_msg}: {err}'))
sql = """UPDATE RisottoLog
SET StopDate = $2,
Level = 'FAILED',
Msg = $3
WHERE LogId = $1
"""
await risotto_context.connection.execute(sql,
async with risotto_context.log_connection.transaction():
await risotto_context.log_connection.execute(sql,
risotto_context.start_id,
datetime.now(),
err,
@ -200,8 +210,6 @@ class Logger:
if get_config()['global']['debug']:
print(msg)
await self.insert(msg,
'',
None,
risotto_context,
'Info',
)

View File

@ -297,6 +297,7 @@ class RegisterDispatcher:
truncate: bool=False,
) -> None:
internal_user = get_config()['global']['internal_user']
async with self.pool.acquire() as log_connection:
async with self.pool.acquire() as connection:
await connection.set_type_codec(
'json',
@ -313,6 +314,7 @@ class RegisterDispatcher:
risotto_context.username = internal_user
risotto_context.paths.append(f'internal.{submodule_name}.on_join')
risotto_context.type = None
risotto_context.log_connection = log_connection
risotto_context.connection = connection
risotto_context.module = submodule_name.split('.', 1)[0]
info_msg = _(f'in function risotto_{submodule_name}.on_join')