Compare commits

..

8 Commits

6 changed files with 218 additions and 202 deletions

View File

@ -1,12 +1,12 @@
CREATE TABLE RisottoLog(
LogId SERIAL PRIMARY KEY,
ContextId INTEGER,
Msg VARCHAR(255) NOT NULL,
URI VARCHAR(255),
URIS VARCHAR(255),
UserLogin VARCHAR(100) NOT NULL,
Level VARCHAR(10) NOT NULL,
ContextId INTEGER,
Data JSON,
Kwargs JSON,
Returns JSON,
StartDate timestamp DEFAULT current_timestamp,
StopDate timestamp

View File

@ -115,6 +115,11 @@ if 'PASSWORD_URL' in environ:
PASSWORD_URL = environ['PASSWORD_URL']
else:
PASSWORD_URL = config.get('PASSWORD_URL', 'https://localhost:8001/')
if 'PASSWORD_LENGTH' in environ:
PASSWORD_LENGTH = int(environ['PASSWORD_LENGTH'])
else:
PASSWORD_LENGTH = int(config.get('PASSWORD_LENGTH', 20))
if 'PKI_ADMIN_PASSWORD' in environ:
PKI_ADMIN_PASSWORD = environ['PKI_ADMIN_PASSWORD']
else:
@ -155,6 +160,7 @@ _config = {'database': {'dsn': dsn_factory(RISOTTO_DB_NAME, RISOTTO_DB_USER, RIS
'admin_password': PASSWORD_ADMIN_PASSWORD,
'device_identifier': PASSWORD_DEVICE_IDENTIFIER,
'service_url': PASSWORD_URL,
'length': PASSWORD_LENGTH,
},
'pki': {'admin_password': PKI_ADMIN_PASSWORD,
'owner': PKI_ADMIN_EMAIL,

View File

@ -3,3 +3,11 @@ class Context:
self.paths = []
self.context_id = None
self.start_id = None
def copy(self):
context = Context()
for key, value in self.__dict__.items():
if key.startswith('__'):
continue
setattr(context, key, value)
return context

View File

@ -89,7 +89,6 @@ class CallDispatcher:
if hasattr(old_risotto_context, 'connection'):
# do not start a new database connection
risotto_context.connection = old_risotto_context.connection
risotto_context.log_connection = old_risotto_context.log_connection
await log.start(risotto_context,
kwargs,
info_msg,
@ -119,67 +118,58 @@ class CallDispatcher:
raise CallError(err) from err
else:
error = None
async with self.pool.acquire() as log_connection:
await log_connection.set_type_codec(
'json',
encoder=dumps,
decoder=loads,
schema='pg_catalog'
)
async with log_connection.transaction():
try:
risotto_context.log_connection = log_connection
async with self.pool.acquire() as connection:
await connection.set_type_codec(
'json',
encoder=dumps,
decoder=loads,
schema='pg_catalog'
)
risotto_context.connection = connection
async with connection.transaction():
try:
await log.start(risotto_context,
try:
async with self.pool.acquire() as connection:
await connection.set_type_codec(
'json',
encoder=dumps,
decoder=loads,
schema='pg_catalog'
)
risotto_context.connection = connection
async with connection.transaction():
try:
await log.start(risotto_context,
kwargs,
info_msg,
)
await self.check_message_type(risotto_context,
kwargs,
)
config_arguments = await self.load_kwargs_to_config(risotto_context,
f'{version}.{message}',
kwargs,
check_role,
internal,
)
ret = await self.launch(risotto_context,
kwargs,
info_msg,
config_arguments,
function_obj,
)
await self.check_message_type(risotto_context,
kwargs,
)
config_arguments = await self.load_kwargs_to_config(risotto_context,
f'{version}.{message}',
kwargs,
check_role,
internal,
)
ret = await self.launch(risotto_context,
kwargs,
config_arguments,
function_obj,
)
# log the success
await log.success(risotto_context,
ret,
)
if not internal and isinstance(ret, dict):
ret['context_id'] = risotto_context.context_id
except CallError as err:
if get_config()['global']['debug']:
print_exc()
await log.failed(risotto_context,
str(err),
)
raise err from err
except CallError as err:
error = err
except Exception as err:
# if there is a problem with arguments, just send an error and do nothing
if get_config()['global']['debug']:
print_exc()
await log.failed(risotto_context,
str(err),
)
error = err
# log the success
await log.success(risotto_context,
ret,
)
if not internal and isinstance(ret, dict):
ret['context_id'] = risotto_context.context_id
except CallError as err:
if get_config()['global']['debug']:
print_exc()
await log.failed(risotto_context,
str(err),
)
raise err from err
except CallError as err:
error = err
except Exception as err:
# if there is a problem with arguments, just send an error and do nothing
if get_config()['global']['debug']:
print_exc()
await log.failed(risotto_context,
str(err),
)
error = err
if error:
if not internal:
err = CallError(str(error))
@ -199,8 +189,6 @@ class PublishDispatcher:
for message, message_infos in messages.items():
# event not emit locally
if message_infos['pattern'] == 'event' and 'functions' in message_infos and message_infos['functions']:
# module, submodule, submessage = message.split('.', 2)
# if f'{module}.{submodule}' not in self.injected_self:
uri = f'{version}.{message}'
print(f' - {uri}')
await self.listened_connection.add_listener(uri,
@ -236,21 +224,34 @@ class PublishDispatcher:
version, message = uri.split('.', 1)
loop = get_event_loop()
remote_kw = loads(payload)
risotto_context = self.build_new_context(remote_kw['context'],
version,
message,
'event',
)
callback = lambda: ensure_future(self._publish(version,
message,
risotto_context,
**remote_kw['kwargs'],
))
loop.call_soon(callback)
for function_obj in self.messages[version][message]['functions']:
risotto_context = self.build_new_context(remote_kw['context'],
version,
message,
'event',
)
callback = self.get_callback(version, message, function_obj, risotto_context, remote_kw['kwargs'],)
loop.call_soon(callback)
def get_callback(self,
version,
message,
function_obj,
risotto_context,
kwargs,
):
return lambda: ensure_future(self._publish(version,
message,
function_obj,
risotto_context,
**kwargs,
))
async def _publish(self,
version: str,
message: str,
function_obj,
risotto_context: Context,
**kwargs,
) -> None:
@ -260,67 +261,48 @@ class PublishDispatcher:
False,
False,
)
async with self.pool.acquire() as log_connection:
await log_connection.set_type_codec(
async with self.pool.acquire() as connection:
await connection.set_type_codec(
'json',
encoder=dumps,
decoder=loads,
schema='pg_catalog'
)
async with log_connection.transaction():
risotto_context.log_connection = log_connection
async with self.pool.acquire() as connection:
await connection.set_type_codec(
'json',
encoder=dumps,
decoder=loads,
schema='pg_catalog'
)
risotto_context.connection = connection
for function_obj in self.messages[version][message]['functions']:
function_name = function_obj['function'].__name__
info_msg = _(f"call function {function_obj['full_module_name']}.{function_name}")
try:
async with connection.transaction():
try:
await log.start(risotto_context,
kwargs,
info_msg,
)
await self.check_message_type(risotto_context,
kwargs,
)
await self.launch(risotto_context,
risotto_context.connection = connection
function_name = function_obj['function'].__name__
info_msg = _(f"call function {function_obj['full_module_name']}.{function_name}")
try:
async with connection.transaction():
try:
await log.start(risotto_context,
kwargs,
info_msg,
)
await self.check_message_type(risotto_context,
kwargs,
config_arguments,
function_obj,
)
# log the success
await log.success(risotto_context)
except CallError as err:
if get_config()['global']['debug']:
print_exc()
await log.failed(risotto_context,
str(err),
)
except CallError:
pass
except Exception as err:
# if there is a problem with arguments, log and do nothing
if get_config()['global']['debug']:
print_exc()
async with self.pool.acquire() as connection:
await connection.set_type_codec(
'json',
encoder=dumps,
decoder=loads,
schema='pg_catalog'
)
risotto_context.connection = connection
async with connection.transaction():
await log.failed(risotto_context,
str(err),
)
await self.launch(risotto_context,
kwargs,
config_arguments,
function_obj,
)
# log the success
await log.success(risotto_context)
except CallError as err:
if get_config()['global']['debug']:
print_exc()
await log.failed(risotto_context,
str(err),
)
except CallError:
pass
except Exception as err:
# if there is a problem with arguments, log and do nothing
if get_config()['global']['debug']:
print_exc()
await log.failed(risotto_context,
str(err),
)
class Dispatcher(register.RegisterDispatcher,
@ -348,6 +330,7 @@ class Dispatcher(register.RegisterDispatcher,
risotto_context.type = type
risotto_context.message = message
risotto_context.version = version
risotto_context.pool = self.pool
return risotto_context
async def check_message_type(self,

View File

@ -2,35 +2,54 @@ from typing import Dict, Any, Optional
from json import dumps, loads
from asyncpg.exceptions import UndefinedTableError
from datetime import datetime
from asyncio import Lock
from .context import Context
from .utils import _
from .config import get_config
database_lock = Lock()
class Logger:
""" An object to manager log
"""
def __init__(self) -> None:
self.log_connection = None
async def get_connection(self,
risotto_context: Context,
):
if not self.log_connection:
self.log_connection = await risotto_context.pool.acquire()
await self.log_connection.set_type_codec(
'json',
encoder=dumps,
decoder=loads,
schema='pg_catalog'
)
return self.log_connection
async def insert(self,
msg: str,
uri: str,
uris: str,
risotto_context: Context,
level: str,
data: Any=None,
kwargs: Any=None,
start: bool=False,
) -> None:
uri = self._get_last_uri(risotto_context)
uris = " ".join(risotto_context.paths)
insert = 'INSERT INTO RisottoLog(Msg, URI, URIS, UserLogin, Level'
values = 'VALUES($1,$2,$3,$4,$5'
args = [msg, uri, uris, risotto_context.username, level]
if data:
insert += ', Data'
if kwargs:
insert += ', Kwargs'
values += ',$6'
args.append(dumps(data))
args.append(dumps(kwargs))
context_id = risotto_context.context_id
if context_id is not None:
insert += ', ContextId'
if data:
if kwargs:
values += ',$7'
else:
values += ',$6'
@ -38,7 +57,9 @@ class Logger:
sql = insert + ') ' + values + ') RETURNING LogId'
try:
log_id = await risotto_context.log_connection.fetchval(sql, *args)
async with database_lock:
connection = await self.get_connection(risotto_context)
log_id = await connection.fetchval(sql, *args)
if context_id is None and start:
risotto_context.context_id = log_id
if start:
@ -51,7 +72,7 @@ class Logger:
context_id: int,
uri: Optional[str],
) -> list:
sql = '''SELECT Msg as msg, URI as uri_name, URIS as uris, UserLogin as user_login, Level as level, Data as data, StartDate as start_date, StopDate as stop_date
sql = '''SELECT Msg as msg, URI as uri_name, URIS as uris, UserLogin as user_login, Level as level, Kwargs as kwargs, Returns as returns, StartDate as start_date, StopDate as stop_date
FROM RisottoLog
WHERE UserLogin = $1 AND (LogId = $2 OR ContextId = $2)
'''
@ -60,20 +81,24 @@ class Logger:
sql += ' AND URI = $3'
args.append(uri)
ret = []
for row in await risotto_context.log_connection.fetch(*args):
d = {}
for key, value in row.items():
if key == 'data':
if isinstance(value, dict):
pass
elif not value:
value = {}
else:
value = loads(value)
elif key in ['start_date', 'stop_date']:
value = str(value)
d[key] = value
ret.append(d)
async with database_lock:
connection = await self.get_connection(risotto_context)
for row in await connection.fetch(*args):
d = {}
for key, value in row.items():
if key in ['kwargs', 'returns']:
if isinstance(value, dict):
pass
elif not value:
value = {}
else:
value = loads(value)
if key == 'uris':
value = value.split(' ')
elif key in ['start_date', 'stop_date']:
value = str(value)
d[key] = value
ret.append(d)
return ret
def _get_last_uri(self,
@ -111,8 +136,6 @@ class Logger:
paths_msg = self._get_message_paths(risotto_context)
print(_(f'{risotto_context.username}: ERROR: {error} ({paths_msg} with arguments "{arguments}": {msg})'))
await self.insert(msg,
self._get_last_uri(risotto_context),
paths_msg,
risotto_context,
'Error',
arguments,
@ -129,8 +152,6 @@ class Logger:
if get_config()['global']['debug']:
print(_(f'{risotto_context.username}: INFO:{paths_msg}: {msg}'))
await self.insert(msg,
self._get_last_uri(risotto_context),
paths_msg,
risotto_context,
'Info',
arguments,
@ -149,8 +170,6 @@ class Logger:
context = ''
print(_(f'{risotto_context.username}: START{context}:{paths_msg}: {msg}'))
await self.insert(msg,
self._get_last_uri(risotto_context),
paths_msg,
risotto_context,
'Start',
arguments,
@ -175,10 +194,12 @@ class Logger:
args.append(dumps(returns))
sql += """WHERE LogId = $1
"""
await risotto_context.log_connection.execute(sql,
risotto_context.start_id,
*args,
)
async with database_lock:
connection = await self.get_connection(risotto_context)
await connection.execute(sql,
risotto_context.start_id,
*args,
)
async def failed(self,
risotto_context: Context,
@ -190,18 +211,20 @@ class Logger:
context = f'({risotto_context.context_id})'
else:
context = ''
print(_(f'{risotto_context.username}: FAILED({risotto_context.context_id}):{paths_msg}: err'))
print(_(f'{risotto_context.username}: FAILED({risotto_context.context_id}):{paths_msg}: {err}'))
sql = """UPDATE RisottoLog
SET StopDate = $2,
Level = 'FAILED',
Msg = $3
WHERE LogId = $1
"""
await risotto_context.log_connection.execute(sql,
risotto_context.start_id,
datetime.now(),
err,
)
async with database_lock:
connection = await self.get_connection(risotto_context)
await connection.execute(sql,
risotto_context.start_id,
datetime.now(),
err,
)
async def info(self,
risotto_context,
@ -210,8 +233,6 @@ class Logger:
if get_config()['global']['debug']:
print(msg)
await self.insert(msg,
'',
None,
risotto_context,
'Info',
)

View File

@ -297,38 +297,36 @@ class RegisterDispatcher:
truncate: bool=False,
) -> None:
internal_user = get_config()['global']['internal_user']
async with self.pool.acquire() as log_connection:
async with log_connection.transaction():
async with self.pool.acquire() as connection:
await connection.set_type_codec(
'json',
encoder=dumps,
decoder=loads,
schema='pg_catalog'
)
if truncate:
async with connection.transaction():
await connection.execute('TRUNCATE InfraServer, InfraSite, InfraZone, Log, ProviderDeployment, ProviderFactoryCluster, ProviderFactoryClusterNode, SettingApplicationservice, SettingApplicationServiceDependency, SettingRelease, SettingServer, SettingServermodel, SettingSource, UserRole, UserRoleURI, UserURI, UserUser, InfraServermodel, ProviderZone, ProviderServer, ProviderSource, ProviderApplicationservice, ProviderServermodel')
async with connection.transaction():
for submodule_name, module in self.injected_self.items():
risotto_context = Context()
risotto_context.username = internal_user
risotto_context.paths.append(f'internal.{submodule_name}.on_join')
risotto_context.type = None
risotto_context.log_connection = log_connection
risotto_context.connection = connection
risotto_context.module = submodule_name.split('.', 1)[0]
info_msg = _(f'in function risotto_{submodule_name}.on_join')
await log.info_msg(risotto_context,
None,
info_msg)
try:
await module.on_join(risotto_context)
except Exception as err:
if get_config()['global']['debug']:
print_exc()
msg = _(f'on_join returns an error in module {submodule_name}: {err}')
await log.error_msg(risotto_context, {}, msg)
async with self.pool.acquire() as connection:
await connection.set_type_codec(
'json',
encoder=dumps,
decoder=loads,
schema='pg_catalog'
)
if truncate:
async with connection.transaction():
await connection.execute('TRUNCATE InfraServer, InfraSite, InfraZone, Log, ProviderDeployment, ProviderFactoryCluster, ProviderFactoryClusterNode, SettingApplicationservice, SettingApplicationServiceDependency, SettingRelease, SettingServer, SettingServermodel, SettingSource, UserRole, UserRoleURI, UserURI, UserUser, InfraServermodel, ProviderZone, ProviderServer, ProviderSource, ProviderApplicationservice, ProviderServermodel')
async with connection.transaction():
for submodule_name, module in self.injected_self.items():
risotto_context = Context()
risotto_context.username = internal_user
risotto_context.paths.append(f'internal.{submodule_name}.on_join')
risotto_context.type = None
risotto_context.pool = self.pool
risotto_context.connection = connection
risotto_context.module = submodule_name.split('.', 1)[0]
info_msg = _(f'in function risotto_{submodule_name}.on_join')
await log.info_msg(risotto_context,
None,
info_msg)
try:
await module.on_join(risotto_context)
except Exception as err:
if get_config()['global']['debug']:
print_exc()
msg = _(f'on_join returns an error in module {submodule_name}: {err}')
await log.error_msg(risotto_context, {}, msg)
async def load(self):
# valid function's arguments