Compare commits

..

12 Commits

10 changed files with 248 additions and 225 deletions

View File

@ -5,7 +5,7 @@ CREATE TABLE RisottoLog(
URI VARCHAR(255),
URIS VARCHAR(255),
UserLogin VARCHAR(100) NOT NULL,
Level VARCHAR(10) NOT NULL,
Status INTEGER NOT NULL,
Kwargs JSON,
Returns JSON,
StartDate timestamp DEFAULT current_timestamp,

View File

@ -65,6 +65,18 @@ if 'CELERYRISOTTO_DB_USER' in environ:
CELERYRISOTTO_DB_USER = environ['CELERYRISOTTO_DB_USER']
else:
CELERYRISOTTO_DB_USER = config.get('CELERYRISOTTO_DB_USER', None)
if 'LEMUR_DB_NAME' in environ:
LEMUR_DB_NAME = environ['LEMUR_DB_NAME']
else:
LEMUR_DB_NAME = config.get('LEMUR_DB_NAME', None)
if 'LEMUR_DB_PASSWORD' in environ:
LEMUR_DB_PASSWORD = environ['LEMUR_DB_PASSWORD']
else:
LEMUR_DB_PASSWORD = config.get('LEMUR_DB_PASSWORD', None)
if 'LEMUR_DB_USER' in environ:
LEMUR_DB_USER = environ['LEMUR_DB_USER']
else:
LEMUR_DB_USER = config.get('LEMUR_DB_USER', None)
if 'DB_ADDRESS' in environ:
DB_ADDRESS = environ['DB_ADDRESS']
else:
@ -115,6 +127,11 @@ if 'PASSWORD_URL' in environ:
PASSWORD_URL = environ['PASSWORD_URL']
else:
PASSWORD_URL = config.get('PASSWORD_URL', 'https://localhost:8001/')
if 'PASSWORD_LENGTH' in environ:
PASSWORD_LENGTH = int(environ['PASSWORD_LENGTH'])
else:
PASSWORD_LENGTH = int(config.get('PASSWORD_LENGTH', 20))
if 'PKI_ADMIN_PASSWORD' in environ:
PKI_ADMIN_PASSWORD = environ['PKI_ADMIN_PASSWORD']
else:
@ -136,7 +153,8 @@ def dsn_factory(database, user, password, address=DB_ADDRESS):
_config = {'database': {'dsn': dsn_factory(RISOTTO_DB_NAME, RISOTTO_DB_USER, RISOTTO_DB_PASSWORD),
'tiramisu_dsn': dsn_factory(TIRAMISU_DB_NAME, TIRAMISU_DB_USER, TIRAMISU_DB_PASSWORD),
'celery_dsn': dsn_factory(CELERYRISOTTO_DB_NAME, CELERYRISOTTO_DB_USER, CELERYRISOTTO_DB_PASSWORD)
'celery_dsn': dsn_factory(CELERYRISOTTO_DB_NAME, CELERYRISOTTO_DB_USER, CELERYRISOTTO_DB_PASSWORD),
'lemur_dns': dsn_factory(LEMUR_DB_NAME, LEMUR_DB_USER, LEMUR_DB_PASSWORD),
},
'http_server': {'port': RISOTTO_PORT,
'default_user': DEFAULT_USER,
@ -155,6 +173,7 @@ _config = {'database': {'dsn': dsn_factory(RISOTTO_DB_NAME, RISOTTO_DB_USER, RIS
'admin_password': PASSWORD_ADMIN_PASSWORD,
'device_identifier': PASSWORD_DEVICE_IDENTIFIER,
'service_url': PASSWORD_URL,
'length': PASSWORD_LENGTH,
},
'pki': {'admin_password': PKI_ADMIN_PASSWORD,
'owner': PKI_ADMIN_EMAIL,

View File

@ -3,3 +3,11 @@ class Context:
self.paths = []
self.context_id = None
self.start_id = None
def copy(self):
context = Context()
for key, value in self.__dict__.items():
if key.startswith('__'):
continue
setattr(context, key, value)
return context

View File

@ -309,6 +309,7 @@ except:
await config_std.property.read_only()
# copy informations from 'to deploy' configuration to configuration
await config.information.importation(await config_std.information.exportation())
await config.value.importation(await config_std.value.exportation())
await config.permissive.importation(await config_std.permissive.exportation())
await config.property.importation(await config_std.property.exportation())

View File

@ -42,14 +42,21 @@ class CallDispatcher:
for ret in returns:
async with await Config(response, display_name=lambda self, dyn_name, suffix: self.impl_getname()) as config:
await config.property.read_write()
key = None
try:
for key, value in ret.items():
await config.option(key).value.set(value)
except AttributeError:
err = _(f'function {module_name}.{function_name} return the unknown parameter "{key}" for the uri "{risotto_context.version}.{risotto_context.message}"')
except AttributeError as err:
if key is not None:
err = _(f'function {module_name}.{function_name} return the unknown parameter "{key}" for the uri "{risotto_context.version}.{risotto_context.message}"')
else:
err = _(f'function {module_name}.{function_name} return unconsistency data "{err}" for the uri "{risotto_context.version}.{risotto_context.message}"')
raise CallError(err)
except ValueError as err:
err = _(f'function {module_name}.{function_name} return the invalid parameter "{key}" for the uri "{risotto_context.version}.{risotto_context.message}": {err}')
if key is not None:
err = _(f'function {module_name}.{function_name} return the invalid parameter "{key}" for the uri "{risotto_context.version}.{risotto_context.message}": {err}')
else:
err = _(f'function {module_name}.{function_name} return unconsistency error for the uri "{risotto_context.version}.{risotto_context.message}": {err}')
raise CallError(err)
await config.property.read_only()
mandatories = await config.value.mandatory()
@ -89,7 +96,6 @@ class CallDispatcher:
if hasattr(old_risotto_context, 'connection'):
# do not start a new database connection
risotto_context.connection = old_risotto_context.connection
risotto_context.log_connection = old_risotto_context.log_connection
await log.start(risotto_context,
kwargs,
info_msg,
@ -119,66 +125,58 @@ class CallDispatcher:
raise CallError(err) from err
else:
error = None
async with self.pool.acquire() as log_connection:
await log_connection.set_type_codec(
'json',
encoder=dumps,
decoder=loads,
schema='pg_catalog'
)
risotto_context.log_connection = log_connection
try:
async with self.pool.acquire() as connection:
await connection.set_type_codec(
'json',
encoder=dumps,
decoder=loads,
schema='pg_catalog'
)
risotto_context.connection = connection
async with connection.transaction():
try:
await log.start(risotto_context,
kwargs,
info_msg,
)
await self.check_message_type(risotto_context,
kwargs,
)
config_arguments = await self.load_kwargs_to_config(risotto_context,
f'{version}.{message}',
kwargs,
check_role,
internal,
)
ret = await self.launch(risotto_context,
kwargs,
config_arguments,
function_obj,
)
# log the success
await log.success(risotto_context,
ret,
)
if not internal and isinstance(ret, dict):
ret['context_id'] = risotto_context.context_id
except CallError as err:
if get_config()['global']['debug']:
print_exc()
await log.failed(risotto_context,
str(err),
)
raise err from err
except CallError as err:
error = err
except Exception as err:
# if there is a problem with arguments, just send an error and do nothing
if get_config()['global']['debug']:
print_exc()
await log.failed(risotto_context,
str(err),
)
error = err
try:
async with self.pool.acquire() as connection:
await connection.set_type_codec(
'json',
encoder=dumps,
decoder=loads,
schema='pg_catalog'
)
risotto_context.connection = connection
async with connection.transaction():
try:
await log.start(risotto_context,
kwargs,
info_msg,
)
await self.check_message_type(risotto_context,
kwargs,
)
config_arguments = await self.load_kwargs_to_config(risotto_context,
f'{version}.{message}',
kwargs,
check_role,
internal,
)
ret = await self.launch(risotto_context,
kwargs,
config_arguments,
function_obj,
)
# log the success
await log.success(risotto_context,
ret,
)
if not internal and isinstance(ret, dict):
ret['context_id'] = risotto_context.context_id
except CallError as err:
if get_config()['global']['debug']:
print_exc()
await log.failed(risotto_context,
str(err),
)
raise err from err
except CallError as err:
error = err
except Exception as err:
# if there is a problem with arguments, just send an error and do nothing
if get_config()['global']['debug']:
print_exc()
await log.failed(risotto_context,
str(err),
)
error = err
if error:
if not internal:
err = CallError(str(error))
@ -198,8 +196,6 @@ class PublishDispatcher:
for message, message_infos in messages.items():
# event not emit locally
if message_infos['pattern'] == 'event' and 'functions' in message_infos and message_infos['functions']:
# module, submodule, submessage = message.split('.', 2)
# if f'{module}.{submodule}' not in self.injected_self:
uri = f'{version}.{message}'
print(f' - {uri}')
await self.listened_connection.add_listener(uri,
@ -235,21 +231,34 @@ class PublishDispatcher:
version, message = uri.split('.', 1)
loop = get_event_loop()
remote_kw = loads(payload)
risotto_context = self.build_new_context(remote_kw['context'],
version,
message,
'event',
)
callback = lambda: ensure_future(self._publish(version,
message,
risotto_context,
**remote_kw['kwargs'],
))
loop.call_soon(callback)
for function_obj in self.messages[version][message]['functions']:
risotto_context = self.build_new_context(remote_kw['context'],
version,
message,
'event',
)
callback = self.get_callback(version, message, function_obj, risotto_context, remote_kw['kwargs'],)
loop.call_soon(callback)
def get_callback(self,
version,
message,
function_obj,
risotto_context,
kwargs,
):
return lambda: ensure_future(self._publish(version,
message,
function_obj,
risotto_context,
**kwargs,
))
async def _publish(self,
version: str,
message: str,
function_obj,
risotto_context: Context,
**kwargs,
) -> None:
@ -259,66 +268,48 @@ class PublishDispatcher:
False,
False,
)
for function_obj in self.messages[version][message]['functions']:
async with self.pool.acquire() as log_connection:
await log_connection.set_type_codec(
'json',
encoder=dumps,
decoder=loads,
schema='pg_catalog'
)
risotto_context.log_connection = log_connection
async with self.pool.acquire() as connection:
await connection.set_type_codec(
'json',
encoder=dumps,
decoder=loads,
schema='pg_catalog'
)
risotto_context.connection = connection
function_name = function_obj['function'].__name__
info_msg = _(f"call function {function_obj['full_module_name']}.{function_name}")
async with self.pool.acquire() as connection:
await connection.set_type_codec(
'json',
encoder=dumps,
decoder=loads,
schema='pg_catalog'
)
risotto_context.connection = connection
function_name = function_obj['function'].__name__
info_msg = _(f"call function {function_obj['full_module_name']}.{function_name}")
try:
async with connection.transaction():
try:
async with connection.transaction():
try:
await log.start(risotto_context,
kwargs,
info_msg,
)
await self.check_message_type(risotto_context,
kwargs,
)
await self.launch(risotto_context,
kwargs,
config_arguments,
function_obj,
)
# log the success
await log.success(risotto_context)
except CallError as err:
if get_config()['global']['debug']:
print_exc()
await log.failed(risotto_context,
str(err),
)
except CallError:
pass
except Exception as err:
# if there is a problem with arguments, log and do nothing
await log.start(risotto_context,
kwargs,
info_msg,
)
await self.check_message_type(risotto_context,
kwargs,
)
await self.launch(risotto_context,
kwargs,
config_arguments,
function_obj,
)
# log the success
await log.success(risotto_context)
except CallError as err:
if get_config()['global']['debug']:
print_exc()
async with self.pool.acquire() as connection:
await connection.set_type_codec(
'json',
encoder=dumps,
decoder=loads,
schema='pg_catalog'
)
risotto_context.connection = connection
async with connection.transaction():
await log.failed(risotto_context,
str(err),
)
await log.failed(risotto_context,
str(err),
)
except CallError:
pass
except Exception as err:
# if there is a problem with arguments, log and do nothing
if get_config()['global']['debug']:
print_exc()
await log.failed(risotto_context,
str(err),
)
class Dispatcher(register.RegisterDispatcher,
@ -346,6 +337,7 @@ class Dispatcher(register.RegisterDispatcher,
risotto_context.type = type
risotto_context.message = message
risotto_context.version = version
risotto_context.pool = self.pool
return risotto_context
async def check_message_type(self,
@ -483,6 +475,8 @@ class Dispatcher(register.RegisterDispatcher,
)
# notification
if function_obj.get('notification'):
if returns is None:
raise Exception(_(f'function "{function_obj["full_module_name"]}.{function_obj["function"].__name__}" must returns something for {function_obj["notification"]}!'))
notif_version, notif_message = function_obj['notification'].split('.', 1)
if not isinstance(returns, list):
send_returns = [returns]

View File

@ -23,9 +23,13 @@ extra_statics = {}
def create_context(request):
risotto_context = Context()
risotto_context.username = request.match_info.get('username',
get_config()['http_server']['default_user'],
)
if 'username' in dict(request.match_info):
username = request.match_info['username']
elif 'username' in request.headers:
username = request.headers['username']
else:
username = get_config()['http_server']['default_user']
risotto_context.username = username
return risotto_context

View File

@ -2,15 +2,37 @@ from typing import Dict, Any, Optional
from json import dumps, loads
from asyncpg.exceptions import UndefinedTableError
from datetime import datetime
from asyncio import Lock
from .context import Context
from .utils import _
from .config import get_config
database_lock = Lock()
LEVELS = ['Error', 'Info', 'Success', 'Started', 'Failure']
class Logger:
""" An object to manager log
"""
def __init__(self) -> None:
self.log_connection = None
async def get_connection(self,
risotto_context: Context,
):
if not self.log_connection:
self.log_connection = await risotto_context.pool.acquire()
await self.log_connection.set_type_codec(
'json',
encoder=dumps,
decoder=loads,
schema='pg_catalog'
)
return self.log_connection
async def insert(self,
msg: str,
risotto_context: Context,
@ -20,9 +42,9 @@ class Logger:
) -> None:
uri = self._get_last_uri(risotto_context)
uris = " ".join(risotto_context.paths)
insert = 'INSERT INTO RisottoLog(Msg, URI, URIS, UserLogin, Level'
insert = 'INSERT INTO RisottoLog(Msg, URI, URIS, UserLogin, Status'
values = 'VALUES($1,$2,$3,$4,$5'
args = [msg, uri, uris, risotto_context.username, level]
args = [msg, uri, uris, risotto_context.username, LEVELS.index(level)]
if kwargs:
insert += ', Kwargs'
values += ',$6'
@ -38,8 +60,9 @@ class Logger:
sql = insert + ') ' + values + ') RETURNING LogId'
try:
async with risotto_context.log_connection.transaction():
log_id = await risotto_context.log_connection.fetchval(sql, *args)
async with database_lock:
connection = await self.get_connection(risotto_context)
log_id = await connection.fetchval(sql, *args)
if context_id is None and start:
risotto_context.context_id = log_id
if start:
@ -47,39 +70,6 @@ class Logger:
except UndefinedTableError as err:
raise Exception(_(f'cannot access to database ({err}), was the database really created?'))
async def query(self,
risotto_context: Context,
context_id: int,
uri: Optional[str],
) -> list:
sql = '''SELECT Msg as msg, URI as uri_name, URIS as uris, UserLogin as user_login, Level as level, Kwargs as kwargs, Returns as returns, StartDate as start_date, StopDate as stop_date
FROM RisottoLog
WHERE UserLogin = $1 AND (LogId = $2 OR ContextId = $2)
'''
args = [sql, risotto_context.username, context_id]
if uri is not None:
sql += ' AND URI = $3'
args.append(uri)
ret = []
async with risotto_context.log_connection.transaction():
for row in await risotto_context.log_connection.fetch(*args):
d = {}
for key, value in row.items():
if key in ['kwargs', 'returns']:
if isinstance(value, dict):
pass
elif not value:
value = {}
else:
value = loads(value)
if key == 'uris':
value = value.split(' ')
elif key in ['start_date', 'stop_date']:
value = str(value)
d[key] = value
ret.append(d)
return ret
def _get_last_uri(self,
risotto_context: Context,
) -> str:
@ -150,7 +140,7 @@ class Logger:
print(_(f'{risotto_context.username}: START{context}:{paths_msg}: {msg}'))
await self.insert(msg,
risotto_context,
'Start',
'Started',
arguments,
start=True,
)
@ -164,20 +154,21 @@ class Logger:
print(_(f'{risotto_context.username}: SUCCESS({risotto_context.context_id}):{paths_msg}'))
sql = """UPDATE RisottoLog
SET StopDate = $2,
Level = 'SUCCESS'
Status = $3
"""
args = [datetime.now()]
args = [datetime.now(), LEVELS.index('Success')]
if returns:
sql += """, Returns = $3
sql += """, Returns = $4
"""
args.append(dumps(returns))
sql += """WHERE LogId = $1
"""
async with risotto_context.log_connection.transaction():
await risotto_context.log_connection.execute(sql,
risotto_context.start_id,
*args,
)
async with database_lock:
connection = await self.get_connection(risotto_context)
await connection.execute(sql,
risotto_context.start_id,
*args,
)
async def failed(self,
risotto_context: Context,
@ -192,16 +183,18 @@ class Logger:
print(_(f'{risotto_context.username}: FAILED({risotto_context.context_id}):{paths_msg}: {err}'))
sql = """UPDATE RisottoLog
SET StopDate = $2,
Level = 'FAILED',
Status = $4,
Msg = $3
WHERE LogId = $1
"""
async with risotto_context.log_connection.transaction():
await risotto_context.log_connection.execute(sql,
risotto_context.start_id,
datetime.now(),
err,
)
async with database_lock:
connection = await self.get_connection(risotto_context)
await connection.execute(sql,
risotto_context.start_id,
datetime.now(),
err[:254],
LEVELS.index('Failure'),
)
async def info(self,
risotto_context,

View File

@ -313,6 +313,7 @@ class CustomParam:
'string': 'String',
'number': 'Number',
'object': 'Dict',
'any': 'Any',
'array': 'Array',
'file': 'File',
'float': 'Float'}
@ -448,6 +449,7 @@ def _get_option(name,
'reverse_condition': ParamValue(True)}),
calc_value_property_help))
props.append('notunique')
description = arg.description.strip().rstrip()
kwargs = {'name': name,
'doc': _get_description(description, name),
@ -523,6 +525,7 @@ def _parse_responses(message_def,
'Number': IntOption,
'Boolean': BoolOption,
'Dict': DictOption,
'Any': AnyOption,
'Float': FloatOption,
# FIXME
'File': StrOption}.get(type_)
@ -530,8 +533,9 @@ def _parse_responses(message_def,
raise Exception(f'unknown param type {obj.type} in responses of message {message_def.message}')
if hasattr(obj, 'default'):
kwargs['default'] = obj.default
kwargs['properties'] = ('notunique',)
else:
kwargs['properties'] = ('mandatory',)
kwargs['properties'] = ('mandatory', 'notunique')
options.append(option(**kwargs))
od = OptionDescription(uri,
message_def.response.description,
@ -600,7 +604,7 @@ def get_messages(current_module_names,
select_option = ChoiceOption('message',
'Nom du message.',
tuple(messages),
properties=frozenset(['mandatory', 'positional']))
properties=frozenset(['mandatory', 'positional', 'notunique']))
for uri in messages:
message_def = get_message(uri,
current_module_names,

View File

@ -107,6 +107,7 @@ class RegisterDispatcher:
version = obj['version']
if version not in self.messages:
self.messages[version] = {}
obj['message'] = tiramisu_message
self.messages[version][tiramisu_message] = obj
def get_function_args(self,
@ -297,37 +298,36 @@ class RegisterDispatcher:
truncate: bool=False,
) -> None:
internal_user = get_config()['global']['internal_user']
async with self.pool.acquire() as log_connection:
async with self.pool.acquire() as connection:
await connection.set_type_codec(
'json',
encoder=dumps,
decoder=loads,
schema='pg_catalog'
)
if truncate:
async with connection.transaction():
await connection.execute('TRUNCATE InfraServer, InfraSite, InfraZone, Log, ProviderDeployment, ProviderFactoryCluster, ProviderFactoryClusterNode, SettingApplicationservice, SettingApplicationServiceDependency, SettingRelease, SettingServer, SettingServermodel, SettingSource, UserRole, UserRoleURI, UserURI, UserUser, InfraServermodel, ProviderZone, ProviderServer, ProviderSource, ProviderApplicationservice, ProviderServermodel')
async with self.pool.acquire() as connection:
await connection.set_type_codec(
'json',
encoder=dumps,
decoder=loads,
schema='pg_catalog'
)
if truncate:
async with connection.transaction():
for submodule_name, module in self.injected_self.items():
risotto_context = Context()
risotto_context.username = internal_user
risotto_context.paths.append(f'internal.{submodule_name}.on_join')
risotto_context.type = None
risotto_context.log_connection = log_connection
risotto_context.connection = connection
risotto_context.module = submodule_name.split('.', 1)[0]
info_msg = _(f'in function risotto_{submodule_name}.on_join')
await log.info_msg(risotto_context,
None,
info_msg)
try:
await module.on_join(risotto_context)
except Exception as err:
if get_config()['global']['debug']:
print_exc()
msg = _(f'on_join returns an error in module {submodule_name}: {err}')
await log.error_msg(risotto_context, {}, msg)
await connection.execute('TRUNCATE InfraServer, InfraSite, InfraZone, Log, ProviderDeployment, ProviderFactoryCluster, ProviderFactoryClusterNode, SettingApplicationservice, SettingApplicationServiceDependency, SettingRelease, SettingServer, SettingServermodel, SettingSource, UserRole, UserRoleURI, UserURI, UserUser, InfraServermodel, ProviderZone, ProviderServer, ProviderSource, ProviderApplicationservice, ProviderServermodel')
async with connection.transaction():
for submodule_name, module in self.injected_self.items():
risotto_context = Context()
risotto_context.username = internal_user
risotto_context.paths.append(f'internal.{submodule_name}.on_join')
risotto_context.type = None
risotto_context.pool = self.pool
risotto_context.connection = connection
risotto_context.module = submodule_name.split('.', 1)[0]
info_msg = _(f'in function risotto_{submodule_name}.on_join')
await log.info_msg(risotto_context,
None,
info_msg)
try:
await module.on_join(risotto_context)
except Exception as err:
if get_config()['global']['debug']:
print_exc()
msg = _(f'on_join returns an error in module {submodule_name}: {err}')
await log.error_msg(risotto_context, {}, msg)
async def load(self):
# valid function's arguments

View File

@ -23,5 +23,5 @@ def tiramisu_display_name(kls,
if suffix:
doc += suffix
if name != doc:
name += f' ({doc})'
name += f'" "{doc}'
return name