Compare commits

..

31 Commits

Author SHA1 Message Date
b0edfb7b01 Merge branch 'develop' into dist/risotto/risotto-2.8.0/develop 2021-04-24 14:16:11 +02:00
27031dbf0e log_connexion 2021-04-24 14:15:54 +02:00
cb4dde1dc4 Merge branch 'develop' into dist/risotto/risotto-2.8.0/develop 2021-04-24 12:56:52 +02:00
9ebe79d533 special connexion for log (do not rollback if error) 2021-04-24 12:56:44 +02:00
c740ec3fe3 Merge branch 'develop' into dist/risotto/risotto-2.8.0/develop 2021-04-24 10:12:51 +02:00
4c83e6d89d better log support 2021-04-24 10:12:39 +02:00
19240489db add http static support 2021-04-24 10:12:32 +02:00
30a267bf4a add TiramisuController 2021-04-24 10:12:13 +02:00
56b1f12a4a Merge branch 'develop' into dist/risotto/risotto-2.8.0/develop 2021-04-16 09:34:21 +02:00
f88bcef5c0 do not stop daemon when on_join failed 2021-04-16 09:33:25 +02:00
4fc3e74bbd Merge branch 'develop' into dist/risotto/risotto-2.8.0/develop 2021-04-13 10:31:24 +02:00
5663b2768b if not Risotto module, do not failed 2021-04-13 10:31:14 +02:00
83d74c2b06 Merge branch 'develop' into dist/risotto/risotto-2.8.0/develop 2021-04-12 15:11:54 +02:00
01834c6ba7 add check_role to dispatcher 2021-04-12 15:11:46 +02:00
6a27b002ff Merge branch 'develop' into dist/risotto/risotto-2.8.0/develop 2021-03-27 10:59:19 +01:00
8fdc34c4d3 fix 2021-03-27 10:59:10 +01:00
e2d73932c0 add sdnotify dependency 2020-11-14 19:11:57 +01:00
980a119ef9 Merge branch 'develop' into dist/risotto/risotto-2.8.0/develop 2020-11-14 19:01:34 +01:00
f623feb8a8 add systemd notifier 2020-11-14 19:01:28 +01:00
b9da2ce686 Merge branch 'develop' into dist/risotto/risotto-2.8.0/develop 2020-11-14 08:12:50 +01:00
46f8a4323b add pki informations 2020-11-14 08:12:39 +01:00
941261c830 Merge branch 'develop' into dist/risotto/risotto-2.8.0/develop 2020-10-14 18:30:13 +02:00
6c4bbb3dca add password support 2020-10-14 18:30:05 +02:00
98c77bf719 Merge branch 'develop' into dist/risotto/risotto-2.8.0/develop 2020-09-20 21:33:15 +02:00
279e3a7c4c better debugging 2020-09-20 21:33:04 +02:00
1b9d87fa53 Merge branch 'develop' into dist/risotto/risotto-2.8.0/develop 2020-09-19 10:33:34 +02:00
13c7d5816c update config 2020-09-19 10:33:27 +02:00
0e988d7040 Merge branch 'develop' into dist/risotto/risotto-2.8.0/develop 2020-09-19 09:20:04 +02:00
a89e512266 update config 2020-09-19 09:18:28 +02:00
be97d757d9 Merge branch 'develop' into dist/risotto/risotto-2.8.0/develop 2020-09-16 17:38:04 +02:00
7afccab9b1 publish use now postgresql 2020-09-16 17:37:46 +02:00
13 changed files with 944 additions and 333 deletions

6
debian/control vendored
View File

@ -9,7 +9,11 @@ Homepage: https://forge.cadoles.com/Infra/risotto
Package: python3-risotto
Architecture: any
Pre-Depends: dpkg, python3, ${misc:Pre-Depends}
Depends: ${python:Depends}, ${misc:Depends}, python3-asyncpg, python3-rougail, python3-aiohttp
Depends: ${python:Depends}, ${misc:Depends},
python3-asyncpg,
python3-rougail,
python3-aiohttp,
python3-sdnotify
Description: configuration manager libraries
Package: risotto

View File

@ -1,13 +1,16 @@
#!/usr/bin/env python3
from sdnotify import SystemdNotifier
from asyncio import get_event_loop
from risotto import get_app
if __name__ == '__main__':
notifier = SystemdNotifier()
loop = get_event_loop()
loop.run_until_complete(get_app(loop))
print('HTTP server ready')
notifier.notify("READY=1")
try:
print('HTTP server ready')
loop.run_forever()
except KeyboardInterrupt:
pass

View File

@ -1,8 +1,16 @@
CREATE TABLE log(
CREATE TABLE RisottoLog(
LogId SERIAL PRIMARY KEY,
Msg VARCHAR(255) NOT NULL,
URI VARCHAR(255),
URIS VARCHAR(255),
UserLogin VARCHAR(100) NOT NULL,
Level VARCHAR(10) NOT NULL,
Path VARCHAR(255),
Username VARCHAR(100) NOT NULL,
ContextId INTEGER,
Data JSON,
Date timestamp DEFAULT current_timestamp
Returns JSON,
StartDate timestamp DEFAULT current_timestamp,
StopDate timestamp
);
CREATE INDEX RisottoLog_ContextId_index ON RisottoLog(ContextId);
CREATE INDEX RisottoLog_Login_index ON RisottoLog(UserLogin);
CREATE INDEX RisottoLog_URI_index ON RisottoLog(URI);

View File

@ -1,6 +1,7 @@
from os import environ
from os.path import isfile
from configobj import ConfigObj
from uuid import uuid4
CONFIG_FILE = environ.get('CONFIG_FILE', '/etc/risotto/risotto.conf')
@ -16,14 +17,14 @@ if 'RISOTTO_PORT' in environ:
RISOTTO_PORT = environ['RISOTTO_PORT']
else:
RISOTTO_PORT = config.get('RISOTTO_PORT', 8080)
if 'RISOTTO_URL' in environ:
RISOTTO_URL = environ['RISOTTO_URL']
else:
RISOTTO_URL = config.get('RISOTTO_URL', 'http://localhost:8080/')
if 'CONFIGURATION_DIR' in environ:
CONFIGURATION_DIR = environ['CONFIGURATION_DIR']
else:
CONFIGURATION_DIR = config.get('CONFIGURATION_DIR', '/srv/risotto/configurations')
if 'PROVIDER_FACTORY_CONFIG_DIR' in environ:
PROVIDER_FACTORY_CONFIG_DIR = environ['PROVIDER_FACTORY_CONFIG_DIR']
else:
PROVIDER_FACTORY_CONFIG_DIR = config.get('PROVIDER_FACTORY_CONFIG_DIR', '/srv/factory')
if 'DEFAULT_USER' in environ:
DEFAULT_USER = environ['DEFAULT_USER']
else:
@ -52,6 +53,18 @@ if 'TIRAMISU_DB_USER' in environ:
TIRAMISU_DB_USER = environ['TIRAMISU_DB_USER']
else:
TIRAMISU_DB_USER = config.get('TIRAMISU_DB_USER', 'tiramisu')
if 'CELERYRISOTTO_DB_NAME' in environ:
CELERYRISOTTO_DB_NAME = environ['CELERYRISOTTO_DB_NAME']
else:
CELERYRISOTTO_DB_NAME = config.get('CELERYRISOTTO_DB_NAME', None)
if 'CELERYRISOTTO_DB_PASSWORD' in environ:
CELERYRISOTTO_DB_PASSWORD = environ['CELERYRISOTTO_DB_PASSWORD']
else:
CELERYRISOTTO_DB_PASSWORD = config.get('CELERYRISOTTO_DB_PASSWORD', None)
if 'CELERYRISOTTO_DB_USER' in environ:
CELERYRISOTTO_DB_USER = environ['CELERYRISOTTO_DB_USER']
else:
CELERYRISOTTO_DB_USER = config.get('CELERYRISOTTO_DB_USER', None)
if 'DB_ADDRESS' in environ:
DB_ADDRESS = environ['DB_ADDRESS']
else:
@ -76,6 +89,44 @@ if 'TMP_DIR' in environ:
TMP_DIR = environ['TMP_DIR']
else:
TMP_DIR = config.get('TMP_DIR', '/tmp')
if 'IMAGE_PATH' in environ:
IMAGE_PATH = environ['IMAGE_PATH']
else:
IMAGE_PATH = config.get('IMAGE_PATH', '/tmp')
if 'PASSWORD_ADMIN_USERNAME' in environ:
PASSWORD_ADMIN_USERNAME = environ['PASSWORD_ADMIN_USERNAME']
else:
PASSWORD_ADMIN_USERNAME = config.get('PASSWORD_ADMIN_USERNAME', 'risotto')
if 'PASSWORD_ADMIN_EMAIL' in environ:
PASSWORD_ADMIN_EMAIL = environ['PASSWORD_ADMIN_EMAIL']
else:
# this parameter is mandatory
PASSWORD_ADMIN_EMAIL = config['PASSWORD_ADMIN_EMAIL']
if 'PASSWORD_ADMIN_PASSWORD' in environ:
PASSWORD_ADMIN_PASSWORD = environ['PASSWORD_ADMIN_PASSWORD']
else:
# this parameter is mandatory
PASSWORD_ADMIN_PASSWORD = config['PASSWORD_ADMIN_PASSWORD']
if 'PASSWORD_DEVICE_IDENTIFIER' in environ:
PASSWORD_DEVICE_IDENTIFIER = environ['PASSWORD_DEVICE_IDENTIFIER']
else:
PASSWORD_DEVICE_IDENTIFIER = config.get('PASSWORD_DEVICE_IDENTIFIER', uuid4())
if 'PASSWORD_URL' in environ:
PASSWORD_URL = environ['PASSWORD_URL']
else:
PASSWORD_URL = config.get('PASSWORD_URL', 'https://localhost:8001/')
if 'PKI_ADMIN_PASSWORD' in environ:
PKI_ADMIN_PASSWORD = environ['PKI_ADMIN_PASSWORD']
else:
PKI_ADMIN_PASSWORD = config['PKI_ADMIN_PASSWORD']
if 'PKI_ADMIN_EMAIL' in environ:
PKI_ADMIN_EMAIL = environ['PKI_ADMIN_EMAIL']
else:
PKI_ADMIN_EMAIL = config['PKI_ADMIN_EMAIL']
if 'PKI_URL' in environ:
PKI_URL = environ['PKI_URL']
else:
PKI_URL = config.get('PKI_URL', 'http://localhost:8002')
def dsn_factory(database, user, password, address=DB_ADDRESS):
@ -85,9 +136,11 @@ def dsn_factory(database, user, password, address=DB_ADDRESS):
_config = {'database': {'dsn': dsn_factory(RISOTTO_DB_NAME, RISOTTO_DB_USER, RISOTTO_DB_PASSWORD),
'tiramisu_dsn': dsn_factory(TIRAMISU_DB_NAME, TIRAMISU_DB_USER, TIRAMISU_DB_PASSWORD),
'celery_dsn': dsn_factory(CELERYRISOTTO_DB_NAME, CELERYRISOTTO_DB_USER, CELERYRISOTTO_DB_PASSWORD)
},
'http_server': {'port': RISOTTO_PORT,
'default_user': DEFAULT_USER},
'default_user': DEFAULT_USER,
'url': RISOTTO_URL},
'global': {'message_root_path': MESSAGE_PATH,
'configurations_dir': CONFIGURATION_DIR,
'debug': True,
@ -97,13 +150,24 @@ _config = {'database': {'dsn': dsn_factory(RISOTTO_DB_NAME, RISOTTO_DB_USER, RIS
'sql_dir': SQL_DIR,
'tmp_dir': TMP_DIR,
},
'password': {'admin_username': PASSWORD_ADMIN_USERNAME,
'admin_email': PASSWORD_ADMIN_EMAIL,
'admin_password': PASSWORD_ADMIN_PASSWORD,
'device_identifier': PASSWORD_DEVICE_IDENTIFIER,
'service_url': PASSWORD_URL,
},
'pki': {'admin_password': PKI_ADMIN_PASSWORD,
'owner': PKI_ADMIN_EMAIL,
'url': PKI_URL,
},
'cache': {'root_path': CACHE_ROOT_PATH},
'servermodel': {'internal_source_path': SRV_SEED_PATH,
'internal_source': 'internal'},
'submodule': {'allow_insecure_https': False,
'pki': '192.168.56.112'},
'provider': {'factory_configuration_dir': PROVIDER_FACTORY_CONFIG_DIR,
'factory_configuration_filename': 'infra.json'},
'provider': {'factory_configuration_filename': 'infra.json',
'packer_filename': 'recipe.json',
'risotto_images_dir': IMAGE_PATH},
}

View File

@ -1,3 +1,5 @@
class Context:
def __init__(self):
self.paths = []
self.context_id = None
self.start_id = None

View File

@ -1,6 +1,22 @@
from os import listdir, makedirs
from os.path import join, isdir, isfile
from shutil import rmtree
from traceback import print_exc
from typing import Dict
from rougail import RougailConvert, RougailConfig, RougailUpgrade
try:
from tiramisu3 import Storage, Config
except:
from tiramisu import Storage, Config
from .config import get_config
from .utils import _, tiramisu_display_name
from .logger import log
from .dispatcher import dispatcher
from .context import Context
from .utils import _
RougailConfig['variable_namespace'] = 'configuration'
class Controller:
@ -8,7 +24,7 @@ class Controller:
"""
def __init__(self,
test: bool,
):
) -> None:
pass
async def call(self,
@ -39,16 +55,277 @@ class Controller:
**kwargs,
):
""" a wrapper to dispatcher's publish"""
version, message = uri.split('.', 1)
if args:
raise ValueError(_(f'the URI "{uri}" can only be published with keyword arguments'))
version, message = uri.split('.', 1)
await dispatcher.publish(version,
message,
risotto_context,
**kwargs,
)
@staticmethod
async def check_role(self,
uri: str,
username: str,
**kwargs: dict,
) -> None:
# create a new config
async with await Config(dispatcher.option) as config:
await config.property.read_write()
await config.option('message').value.set(uri)
subconfig = config.option(uri)
for key, value in kwargs.items():
try:
await subconfig.option(key).value.set(value)
except AttributeError:
if get_config()['global']['debug']:
print_exc()
raise ValueError(_(f'unknown parameter in "{uri}": "{key}"'))
except ValueOptionError as err:
raise ValueError(_(f'invalid parameter in "{uri}": {err}'))
await dispatcher.check_role(subconfig,
username,
uri,
)
async def on_join(self,
risotto_context,
):
pass
class TiramisuController(Controller):
def __init__(self,
test: bool,
) -> None:
if not 'dataset_name' in vars(self):
raise Exception(f'please specify "dataset_name" to "{self.__class__.__name__}"')
self.tiramisu_cache_root_path = join(get_config()['cache']['root_path'], self.dataset_name)
if not test:
db_conf = get_config()['database']['tiramisu_dsn']
self.save_storage = Storage(engine='postgres')
self.save_storage.setting(dsn=db_conf)
if self.dataset_name != 'servermodel':
self.optiondescription = None
dispatcher.set_function('v1.setting.dataset.updated',
None,
TiramisuController.dataset_updated,
self.__class__.__module__,
)
async def on_join(self,
risotto_context: Context,
) -> None:
if isdir(self.tiramisu_cache_root_path):
await self.load_datas(risotto_context)
async def dataset_updated(self,
risotto_context: Context,
) -> Dict:
await self.gen_dictionaries(risotto_context)
await self.load_datas(risotto_context)
async def gen_dictionaries(self,
risotto_context: Context,
) -> None:
sources = await self.get_sources(risotto_context)
self._aggregate_tiramisu_funcs(sources)
self._convert_dictionaries_to_tiramisu(sources)
async def get_sources(self,
risotto_context: Context,
) -> None:
return await self.call('v1.setting.source.list',
risotto_context,
)
def _aggregate_tiramisu_funcs(self,
sources: list,
) -> None:
dest_file = join(self.tiramisu_cache_root_path, 'funcs.py')
if not isdir(self.tiramisu_cache_root_path):
makedirs(self.tiramisu_cache_root_path)
with open(dest_file, 'wb') as funcs:
funcs.write(b"""try:
from tiramisu3 import valid_network_netmask, valid_ip_netmask, valid_broadcast, valid_in_network, valid_not_equal as valid_differ, valid_not_equal, calc_value
except:
from tiramisu import valid_network_netmask, valid_ip_netmask, valid_broadcast, valid_in_network, valid_not_equal as valid_differ, valid_not_equal, calc_value
""")
for source in sources:
root_path = join(source['source_directory'],
self.dataset_name,
)
if not isdir(root_path):
continue
for service in listdir(root_path):
path = join(root_path,
service,
'funcs',
)
if not isdir(path):
continue
for filename in listdir(path):
if not filename.endswith('.py'):
continue
filename_path = join(path, filename)
with open(filename_path, 'rb') as fh:
funcs.write(f'# {filename_path}\n'.encode())
funcs.write(fh.read())
funcs.write(b'\n')
def _convert_dictionaries_to_tiramisu(self, sources: list) -> None:
funcs_file = join(self.tiramisu_cache_root_path, 'funcs.py')
tiramisu_file = join(self.tiramisu_cache_root_path, 'tiramisu.py')
dictionaries_dir = join(self.tiramisu_cache_root_path, 'dictionaries')
extras_dictionaries_dir = join(self.tiramisu_cache_root_path, 'extra_dictionaries')
if isdir(dictionaries_dir):
rmtree(dictionaries_dir)
makedirs(dictionaries_dir)
if isdir(extras_dictionaries_dir):
rmtree(extras_dictionaries_dir)
makedirs(extras_dictionaries_dir)
extras = []
upgrade = RougailUpgrade()
for source in sources:
root_path = join(source['source_directory'],
self.dataset_name,
)
if not isdir(root_path):
continue
for service in listdir(root_path):
# upgrade dictionaries
path = join(root_path,
service,
'dictionaries',
)
if not isdir(path):
continue
upgrade.load_xml_from_folders(path,
dictionaries_dir,
RougailConfig['variable_namespace'],
)
for service in listdir(root_path):
# upgrade extra dictionaries
path = join(root_path,
service,
'extras',
)
if not isdir(path):
continue
for namespace in listdir(path):
extra_dir = join(path,
namespace,
)
if not isdir(extra_dir):
continue
extra_dictionaries_dir = join(extras_dictionaries_dir,
namespace,
)
if not isdir(extra_dictionaries_dir):
makedirs(extra_dictionaries_dir)
extras.append((namespace, [extra_dictionaries_dir]))
upgrade.load_xml_from_folders(extra_dir,
extra_dictionaries_dir,
namespace,
)
del upgrade
config = RougailConfig.copy()
config['functions_file'] = funcs_file
config['dictionaries_dir'] = [dictionaries_dir]
config['extra_dictionaries'] = {}
for extra in extras:
config['extra_dictionaries'][extra[0]] = extra[1]
eolobj = RougailConvert(rougailconfig=config)
eolobj.save(tiramisu_file)
async def load(self,
risotto_context: Context,
name: str,
to_deploy: bool=False,
) -> Config:
if self.optiondescription is None:
# use file in cache
tiramisu_file = join(self.tiramisu_cache_root_path, 'tiramisu.py')
if not isfile(tiramisu_file):
raise Exception(_(f'unable to load the "{self.dataset_name}" configuration, is dataset loaded?'))
with open(tiramisu_file) as fileio:
tiramisu_locals = {}
try:
exec(fileio.read(), None, tiramisu_locals)
except Exception as err:
raise Exception(_(f'unable to load tiramisu file {tiramisu_file}: {err}'))
self.optiondescription = tiramisu_locals['option_0']
del tiramisu_locals
try:
letter = self.dataset_name[0]
if not to_deploy:
session_id = f'{letter}_{name}'
else:
session_id = f'{letter}td_{name}'
config = await Config(self.optiondescription,
session_id=session_id,
storage=self.save_storage,
display_name=tiramisu_display_name,
)
# change default rights
await config.property.read_only()
await config.permissive.add('basic')
await config.permissive.add('normal')
await config.permissive.add('expert')
# set information and owner
await config.owner.set(session_id)
await config.information.set(f'{self.dataset_name}_name', name)
except Exception as err:
if get_config()['global']['debug']:
print_exc()
msg = _(f'unable to load config for {self.dataset_name} "{name}": {err}')
await log.error_msg(risotto_context,
None,
msg,
)
return config
async def _deploy_configuration(self,
dico: dict,
) -> None:
config_std = dico['config_to_deploy']
config = dico['config']
# when deploy, calculate force_store_value
ro = await config_std.property.getdefault('read_only', 'append')
if 'force_store_value' not in ro:
await config_std.property.read_write()
if self.dataset_name == 'servermodel':
# server_deployed should be hidden
await config_std.forcepermissive.option('configuration.general.server_deployed').value.set(True)
ro = frozenset(list(ro) + ['force_store_value'])
rw = await config_std.property.getdefault('read_write', 'append')
rw = frozenset(list(rw) + ['force_store_value'])
await config_std.property.setdefault(ro, 'read_only', 'append')
await config_std.property.setdefault(rw, 'read_write', 'append')
await config_std.property.read_only()
# copy informations from 'to deploy' configuration to configuration
await config.value.importation(await config_std.value.exportation())
await config.permissive.importation(await config_std.permissive.exportation())
await config.property.importation(await config_std.property.exportation())
async def build_configuration(self,
config: Config,
) -> dict:
configuration = {}
for option in await config.option.list('optiondescription'):
name = await option.option.name()
if name == 'services':
continue
if name == RougailConfig['variable_namespace']:
fullpath = False
flatten = True
else:
fullpath = True
flatten = False
configuration.update(await option.value.dict(leader_to_list=True, fullpath=fullpath, flatten=flatten))
return configuration

View File

@ -4,6 +4,7 @@ try:
except:
from tiramisu import Config
from tiramisu.error import ValueOptionError
from asyncio import get_event_loop, ensure_future
from traceback import print_exc
from copy import copy
from typing import Dict, Callable, List, Optional
@ -15,7 +16,6 @@ from .logger import log
from .config import get_config
from .context import Context
from . import register
from .remote import Remote
class CallDispatcher:
@ -30,14 +30,11 @@ class CallDispatcher:
if response.impl_get_information('multi'):
if not isinstance(returns, list):
err = _(f'function {module_name}.{function_name} has to return a list')
await log.error_msg(risotto_context, kwargs, err)
raise CallError(str(err))
raise CallError(err)
else:
if not isinstance(returns, dict):
await log.error_msg(risotto_context, kwargs, returns)
err = _(f'function {module_name}.{function_name} has to return a dict')
await log.error_msg(risotto_context, kwargs, err)
raise CallError(str(err))
raise CallError(err)
returns = [returns]
if response is None:
raise Exception('hu?')
@ -50,12 +47,10 @@ class CallDispatcher:
await config.option(key).value.set(value)
except AttributeError:
err = _(f'function {module_name}.{function_name} return the unknown parameter "{key}" for the uri "{risotto_context.version}.{risotto_context.message}"')
await log.error_msg(risotto_context, kwargs, err)
raise CallError(str(err))
except ValueError:
err = _(f'function {module_name}.{function_name} return the parameter "{key}" with an unvalid value "{value}" for the uri "{risotto_context.version}.{risotto_context.message}"')
await log.error_msg(risotto_context, kwargs, err)
raise CallError(str(err))
raise CallError(err)
except ValueError as err:
err = _(f'function {module_name}.{function_name} return the invalid parameter "{key}" for the uri "{risotto_context.version}.{risotto_context.message}": {err}')
raise CallError(err)
await config.property.read_only()
mandatories = await config.value.mandatory()
if mandatories:
@ -65,8 +60,7 @@ class CallDispatcher:
await config.value.dict()
except Exception as err:
err = _(f'function {module_name}.{function_name} return an invalid response {err} for the uri "{risotto_context.version}.{risotto_context.message}"')
await log.error_msg(risotto_context, kwargs, err)
raise CallError(str(err))
raise CallError(err)
async def call(self,
version: str,
@ -79,7 +73,7 @@ class CallDispatcher:
""" execute the function associate with specified uri
arguments are validate before
"""
risotto_context = self.build_new_context(old_risotto_context,
risotto_context = self.build_new_context(old_risotto_context.__dict__,
version,
message,
'rpc',
@ -88,116 +82,193 @@ class CallDispatcher:
raise CallError(_(f'cannot find version of message "{version}"'))
if message not in self.messages[version]:
raise CallError(_(f'cannot find message "{version}.{message}"'))
function_objs = [self.messages[version][message]]
# do not start a new database connection
function_obj = self.messages[version][message]
# log
function_name = function_obj['function'].__name__
info_msg = _(f"call function {function_obj['full_module_name']}.{function_name}")
if hasattr(old_risotto_context, 'connection'):
# do not start a new database connection
risotto_context.connection = old_risotto_context.connection
return await self.launch(version,
message,
risotto_context,
check_role,
kwargs,
function_objs,
internal,
)
else:
risotto_context.log_connection = old_risotto_context.log_connection
await log.start(risotto_context,
kwargs,
info_msg,
)
await self.check_message_type(risotto_context,
kwargs,
)
config_arguments = await self.load_kwargs_to_config(risotto_context,
f'{version}.{message}',
kwargs,
check_role,
internal,
)
try:
async with self.pool.acquire() as connection:
await connection.set_type_codec(
'json',
encoder=dumps,
decoder=loads,
schema='pg_catalog'
)
risotto_context.connection = connection
async with connection.transaction():
return await self.launch(version,
message,
risotto_context,
check_role,
kwargs,
function_objs,
internal,
)
except CallError as err:
raise err
ret = await self.launch(risotto_context,
kwargs,
config_arguments,
function_obj,
)
await log.success(risotto_context,
ret,
)
except Exception as err:
# if there is a problem with arguments, just send an error and do nothing
if get_config()['global']['debug']:
print_exc()
async with self.pool.acquire() as connection:
await connection.set_type_codec(
'json',
encoder=dumps,
decoder=loads,
schema='pg_catalog'
)
risotto_context.connection = connection
async with connection.transaction():
await log.error_msg(risotto_context, kwargs, err)
raise err
class PublishDispatcher:
async def publish(self,
version: str,
message: str,
old_risotto_context: Context,
check_role: bool=False,
internal: bool=True,
**kwargs,
) -> None:
risotto_context = self.build_new_context(old_risotto_context,
version,
message,
'event',
)
try:
function_objs = self.messages[version][message].get('functions', [])
except KeyError:
raise ValueError(_(f'cannot find message {version}.{message}'))
# do not start a new database connection
if hasattr(old_risotto_context, 'connection'):
# publish to remove
remote_kw = dumps({'kwargs': kwargs,
'context': risotto_context.__dict__,
})
risotto_context.connection = old_risotto_context.connection
# FIXME should be better :/
remote_kw = remote_kw.replace("'", "''")
await risotto_context.connection.execute(f'NOTIFY "{version}.{message}", \'{remote_kw}\'')
return await self.launch(version,
message,
risotto_context,
check_role,
kwargs,
function_objs,
internal,
)
async with self.pool.acquire() as connection:
try:
await connection.set_type_codec(
await log.failed(risotto_context,
str(err),
)
raise CallError(err) from err
else:
error = None
async with self.pool.acquire() as log_connection:
await log_connection.set_type_codec(
'json',
encoder=dumps,
decoder=loads,
schema='pg_catalog'
)
risotto_context.connection = connection
async with connection.transaction():
return await self.launch(version,
message,
risotto_context,
check_role,
kwargs,
function_objs,
internal,
)
except CallError as err:
pass
except Exception as err:
# if there is a problem with arguments, log and do nothing
if get_config()['global']['debug']:
print_exc()
async with log_connection.transaction():
try:
risotto_context.log_connection = log_connection
async with self.pool.acquire() as connection:
await connection.set_type_codec(
'json',
encoder=dumps,
decoder=loads,
schema='pg_catalog'
)
risotto_context.connection = connection
async with connection.transaction():
try:
await log.start(risotto_context,
kwargs,
info_msg,
)
await self.check_message_type(risotto_context,
kwargs,
)
config_arguments = await self.load_kwargs_to_config(risotto_context,
f'{version}.{message}',
kwargs,
check_role,
internal,
)
ret = await self.launch(risotto_context,
kwargs,
config_arguments,
function_obj,
)
# log the success
await log.success(risotto_context,
ret,
)
if not internal and isinstance(ret, dict):
ret['context_id'] = risotto_context.context_id
except CallError as err:
if get_config()['global']['debug']:
print_exc()
await log.failed(risotto_context,
str(err),
)
raise err from err
except CallError as err:
error = err
except Exception as err:
# if there is a problem with arguments, just send an error and do nothing
if get_config()['global']['debug']:
print_exc()
await log.failed(risotto_context,
str(err),
)
error = err
if error:
if not internal:
err = CallError(str(error))
err.context_id = risotto_context.context_id
else:
err = error
raise err from error
return ret
class PublishDispatcher:
async def register_remote(self) -> None:
print()
print(_('======== Registered remote event ========'))
self.listened_connection = await self.pool.acquire()
for version, messages in self.messages.items():
for message, message_infos in messages.items():
# event not emit locally
if message_infos['pattern'] == 'event' and 'functions' in message_infos and message_infos['functions']:
# module, submodule, submessage = message.split('.', 2)
# if f'{module}.{submodule}' not in self.injected_self:
uri = f'{version}.{message}'
print(f' - {uri}')
await self.listened_connection.add_listener(uri,
self.to_async_publish,
)
async def publish(self,
version: str,
message: str,
risotto_context: Context,
**kwargs,
) -> None:
if version not in self.messages or message not in self.messages[version]:
raise ValueError(_(f'cannot find URI "{version}.{message}"'))
# publish to remote
remote_kw = dumps({'kwargs': kwargs,
'context': {'username': risotto_context.username,
'paths': risotto_context.paths,
'context_id': risotto_context.context_id,
}
})
# FIXME should be better :/
remote_kw = remote_kw.replace("'", "''")
await risotto_context.connection.execute(f'NOTIFY "{version}.{message}", \'{remote_kw}\'')
def to_async_publish(self,
con: 'asyncpg.connection.Connection',
pid: int,
uri: str,
payload: str,
) -> None:
version, message = uri.split('.', 1)
loop = get_event_loop()
remote_kw = loads(payload)
risotto_context = self.build_new_context(remote_kw['context'],
version,
message,
'event',
)
callback = lambda: ensure_future(self._publish(version,
message,
risotto_context,
**remote_kw['kwargs'],
))
loop.call_soon(callback)
async def _publish(self,
version: str,
message: str,
risotto_context: Context,
**kwargs,
) -> None:
config_arguments = await self.load_kwargs_to_config(risotto_context,
f'{version}.{message}',
kwargs,
False,
False,
)
async with self.pool.acquire() as log_connection:
await log_connection.set_type_codec(
'json',
encoder=dumps,
decoder=loads,
schema='pg_catalog'
)
async with log_connection.transaction():
risotto_context.log_connection = log_connection
async with self.pool.acquire() as connection:
await connection.set_type_codec(
'json',
@ -206,19 +277,61 @@ class PublishDispatcher:
schema='pg_catalog'
)
risotto_context.connection = connection
async with connection.transaction():
await log.error_msg(risotto_context, kwargs, err)
for function_obj in self.messages[version][message]['functions']:
function_name = function_obj['function'].__name__
info_msg = _(f"call function {function_obj['full_module_name']}.{function_name}")
try:
async with connection.transaction():
try:
await log.start(risotto_context,
kwargs,
info_msg,
)
await self.check_message_type(risotto_context,
kwargs,
)
await self.launch(risotto_context,
kwargs,
config_arguments,
function_obj,
)
# log the success
await log.success(risotto_context)
except CallError as err:
if get_config()['global']['debug']:
print_exc()
await log.failed(risotto_context,
str(err),
)
except CallError:
pass
except Exception as err:
# if there is a problem with arguments, log and do nothing
if get_config()['global']['debug']:
print_exc()
async with self.pool.acquire() as connection:
await connection.set_type_codec(
'json',
encoder=dumps,
decoder=loads,
schema='pg_catalog'
)
risotto_context.connection = connection
async with connection.transaction():
await log.failed(risotto_context,
str(err),
)
class Dispatcher(register.RegisterDispatcher,
Remote,
CallDispatcher,
PublishDispatcher):
PublishDispatcher,
):
""" Manage message (call or publish)
so launch a function when a message is called
"""
def build_new_context(self,
old_risotto_context: Context,
context: dict,
version: str,
message: str,
type: str,
@ -227,8 +340,9 @@ class Dispatcher(register.RegisterDispatcher,
"""
uri = version + '.' + message
risotto_context = Context()
risotto_context.username = old_risotto_context.username
risotto_context.paths = copy(old_risotto_context.paths)
risotto_context.username = context['username']
risotto_context.paths = copy(context['paths'])
risotto_context.context_id = context['context_id']
risotto_context.paths.append(uri)
risotto_context.uri = uri
risotto_context.type = type
@ -242,7 +356,6 @@ class Dispatcher(register.RegisterDispatcher,
) -> None:
if self.messages[risotto_context.version][risotto_context.message]['pattern'] != risotto_context.type:
msg = _(f'{risotto_context.uri} is not a {risotto_context.type} message')
await log.error_msg(risotto_context, kwargs, msg)
raise CallError(msg)
async def load_kwargs_to_config(self,
@ -288,7 +401,7 @@ class Dispatcher(register.RegisterDispatcher,
parameters = await subconfig.value.dict()
if extra_parameters:
parameters.update(extra_parameters)
return parameters
return parameters
def get_service(self,
name: str):
@ -297,14 +410,15 @@ class Dispatcher(register.RegisterDispatcher,
async def check_role(self,
config: Config,
user_login: str,
uri: str) -> None:
uri: str,
) -> None:
async with self.pool.acquire() as connection:
async with connection.transaction():
# Verify if user exists and get ID
sql = '''
SELECT UserId
FROM UserUser
WHERE UserLogin = $1
WHERE Login = $1
'''
user_id = await connection.fetchval(sql,
user_login)
@ -342,65 +456,48 @@ class Dispatcher(register.RegisterDispatcher,
raise NotAllowedError(_(f'You ({user_login}) don\'t have any authorisation to access to "{uri}"'))
async def launch(self,
version: str,
message: str,
risotto_context: Context,
check_role: bool,
kwargs: Dict,
function_objs: List,
internal: bool,
config_arguments: dict,
function_obj: Callable,
) -> Optional[Dict]:
await self.check_message_type(risotto_context,
kwargs)
config_arguments = await self.load_kwargs_to_config(risotto_context,
f'{version}.{message}',
kwargs,
check_role,
internal,
)
# config is ok, so send the message
for function_obj in function_objs:
function = function_obj['function']
submodule_name = function_obj['module']
function_name = function.__name__
risotto_context.module = submodule_name.split('.', 1)[0]
info_msg = _(f'in module {submodule_name}.{function_name}')
# build argument for this function
if risotto_context.type == 'rpc':
kw = config_arguments
else:
kw = {}
for key, value in config_arguments.items():
if key in function_obj['arguments']:
kw[key] = value
# so send the message
function = function_obj['function']
risotto_context.module = function_obj['module'].split('.', 1)[0]
# build argument for this function
if risotto_context.type == 'rpc':
kw = config_arguments
else:
kw = {}
for key, value in config_arguments.items():
if key in function_obj['arguments']:
kw[key] = value
kw['risotto_context'] = risotto_context
returns = await function(self.injected_self[function_obj['module']], **kw)
if risotto_context.type == 'rpc':
# valid returns
await self.valid_call_returns(risotto_context,
function,
returns,
kwargs)
# log the success
await log.info_msg(risotto_context,
{'arguments': kwargs,
'returns': returns},
info_msg)
# notification
if function_obj.get('notification'):
notif_version, notif_message = function_obj['notification'].split('.', 1)
if not isinstance(returns, list):
send_returns = [returns]
else:
send_returns = returns
for ret in send_returns:
await self.publish(notif_version,
notif_message,
risotto_context,
**ret)
if risotto_context.type == 'rpc':
return returns
kw['risotto_context'] = risotto_context
# launch
returns = await function(self.get_service(function_obj['module']), **kw)
if risotto_context.type == 'rpc':
# valid returns
await self.valid_call_returns(risotto_context,
function,
returns,
kwargs,
)
# notification
if function_obj.get('notification'):
notif_version, notif_message = function_obj['notification'].split('.', 1)
if not isinstance(returns, list):
send_returns = [returns]
else:
send_returns = returns
for ret in send_returns:
await self.publish(notif_version,
notif_message,
risotto_context,
**ret,
)
if risotto_context.type == 'rpc':
return returns
dispatcher = Dispatcher()

View File

@ -1,4 +1,4 @@
from aiohttp.web import Application, Response, get, post, HTTPBadRequest, HTTPInternalServerError, HTTPNotFound
from aiohttp.web import Application, Response, get, post, HTTPBadRequest, HTTPInternalServerError, HTTPNotFound, static
from json import dumps
from traceback import print_exc
try:
@ -12,12 +12,13 @@ from .utils import _
from .context import Context
from .error import CallError, NotAllowedError, RegistrationError
from .message import get_messages
from .logger import log
#from .logger import log
from .config import get_config
from . import services
extra_routes = {}
extra_statics = {}
def create_context(request):
@ -29,19 +30,31 @@ def create_context(request):
def register(version: str,
path: str):
path: str,
):
""" Decorator to register function to the http route
"""
def decorator(function):
if path in extra_routes:
raise RegistrationError(f'the route {path} is already registered')
raise RegistrationError(f'the route "{path}" is already registered')
extra_routes[path] = {'function': function,
'version': version}
'version': version,
}
return decorator
def register_static(path: str,
directory: str,
) -> None:
if path in extra_statics:
raise RegistrationError(f'the static path "{path}" is already registered')
extra_statics[path] = directory
class extra_route_handler:
async def __new__(cls, request):
async def __new__(cls,
request,
):
kwargs = dict(request.match_info)
kwargs['request'] = request
kwargs['risotto_context'] = create_context(request)
@ -67,7 +80,8 @@ class extra_route_handler:
# await log.info_msg(kwargs['risotto_context'],
# dict(request.match_info))
return Response(text=dumps(returns),
content_type='application/json')
content_type='application/json',
)
async def handle(request):
@ -87,20 +101,38 @@ async def handle(request):
internal=False,
**kwargs,
)
except NotAllowedError as err:
raise HTTPNotFound(reason=str(err))
except CallError as err:
raise HTTPBadRequest(reason=str(err).replace('\n', ' '))
except Exception as err:
if get_config()['global']['debug']:
print_exc()
raise HTTPInternalServerError(reason=str(err))
return Response(text=dumps({'response': text}),
content_type='application/json')
context_id = None
if isinstance(err, NotAllowedError):
error_type = HTTPNotFound
elif isinstance(err, CallError):
error_type = HTTPBadRequest
context_id = err.context_id
else:
if get_config()['global']['debug']:
print_exc()
error_type = HTTPInternalServerError
response = {'type': 'error',
'reason': str(err).replace('\n', ' '),
}
if context_id is not None:
response['context_id'] = context_id
err = dumps({'response': response,
'type': 'error',
})
raise error_type(text=err,
content_type='application/json',
)
return Response(text=dumps({'response': text,
'type': 'success',
}),
content_type='application/json',
)
async def api(request,
risotto_context):
risotto_context,
):
global TIRAMISU
if not TIRAMISU:
# check all URI that have an associated role
@ -130,7 +162,7 @@ async def api(request,
async def get_app(loop):
""" build all routes
"""
global extra_routes
global extra_routes, extra_statics
services.link_to_dispatcher(dispatcher)
app = Application(loop=loop)
routes = []
@ -152,7 +184,8 @@ async def get_app(loop):
for version in versions:
api_route = {'function': api,
'version': version,
'path': f'/api/{version}'}
'path': f'/api/{version}',
}
extra_handler = type(api_route['path'], (extra_route_handler,), api_route)
routes.append(get(api_route['path'], extra_handler))
print(f' - {api_route["path"]} (http_get)')
@ -169,12 +202,22 @@ async def get_app(loop):
extra_handler = type(path, (extra_route_handler,), extra)
routes.append(get(path, extra_handler))
print(f' - {path} (http_get)')
if extra_statics:
if not extra_routes:
print(_('======== Registered static routes ========'))
for path, directory in extra_statics.items():
routes.append(static(path, directory))
print(f' - {path} (static)')
del extra_routes
del extra_statics
app.router.add_routes(routes)
await dispatcher.register_remote()
print()
await dispatcher.on_join()
return await loop.create_server(app.make_handler(), '*', get_config()['http_server']['port'])
return await loop.create_server(app.make_handler(),
'*',
get_config()['http_server']['port'],
)
TIRAMISU = None

View File

@ -1,7 +1,7 @@
from typing import Dict, Any
from json import dumps
from typing import Dict, Any, Optional
from json import dumps, loads
from asyncpg.exceptions import UndefinedTableError
from datetime import datetime
from .context import Context
from .utils import _
@ -13,26 +13,79 @@ class Logger:
"""
async def insert(self,
msg: str,
path: str,
risotto_context: str,
uri: str,
uris: str,
risotto_context: Context,
level: str,
data: Any= None) -> None:
insert = 'INSERT INTO log(Msg, Path, Username, Level'
values = 'VALUES($1,$2,$3,$4'
args = [msg, path, risotto_context.username, level]
data: Any=None,
start: bool=False,
) -> None:
insert = 'INSERT INTO RisottoLog(Msg, URI, URIS, UserLogin, Level'
values = 'VALUES($1,$2,$3,$4,$5'
args = [msg, uri, uris, risotto_context.username, level]
if data:
insert += ', Data'
values += ',$5'
values += ',$6'
args.append(dumps(data))
context_id = risotto_context.context_id
if context_id is not None:
insert += ', ContextId'
if data:
values += ',$7'
else:
values += ',$6'
args.append(context_id)
sql = insert + ') ' + values + ')'
sql = insert + ') ' + values + ') RETURNING LogId'
try:
await risotto_context.connection.fetch(sql, *args)
log_id = await risotto_context.log_connection.fetchval(sql, *args)
if context_id is None and start:
risotto_context.context_id = log_id
if start:
risotto_context.start_id = log_id
except UndefinedTableError as err:
raise Exception(_(f'cannot access to database ({err}), was the database really created?'))
async def query(self,
risotto_context: Context,
context_id: int,
uri: Optional[str],
) -> list:
sql = '''SELECT Msg as msg, URI as uri_name, URIS as uris, UserLogin as user_login, Level as level, Data as data, StartDate as start_date, StopDate as stop_date
FROM RisottoLog
WHERE UserLogin = $1 AND (LogId = $2 OR ContextId = $2)
'''
args = [sql, risotto_context.username, context_id]
if uri is not None:
sql += ' AND URI = $3'
args.append(uri)
ret = []
for row in await risotto_context.log_connection.fetch(*args):
d = {}
for key, value in row.items():
if key == 'data':
if not value:
value = {}
# else:
# value = loads(value)
elif key in ['start_date', 'stop_date']:
value = str(value)
d[key] = value
ret.append(d)
return ret
def _get_last_uri(self,
risotto_context: Context,
) -> str:
if risotto_context.paths:
return risotto_context.paths[-1]
return ''
def _get_message_paths(self,
risotto_context: Context):
risotto_context: Context,
) -> str:
if not risotto_context.paths:
return ''
paths = risotto_context.paths
if risotto_context.type:
paths_msg = f' {risotto_context.type} '
@ -49,44 +102,117 @@ class Logger:
risotto_context: Context,
arguments,
error: str,
msg: str=''):
msg: str='',
):
""" send message when an error append
"""
paths_msg = self._get_message_paths(risotto_context)
print(_(f'{risotto_context.username}: ERROR: {error} ({paths_msg} with arguments "{arguments}": {msg})'))
await self.insert(msg,
self._get_last_uri(risotto_context),
paths_msg,
risotto_context,
'Error',
arguments)
arguments,
)
async def info_msg(self,
risotto_context: Context,
arguments: Dict,
msg: str=''):
msg: str='',
) -> None:
""" send message with common information
"""
if risotto_context.paths:
paths_msg = self._get_message_paths(risotto_context)
else:
paths_msg = ''
paths_msg = self._get_message_paths(risotto_context)
if get_config()['global']['debug']:
print(_(f'{risotto_context.username}: INFO:{paths_msg}: {msg}'))
await self.insert(msg,
self._get_last_uri(risotto_context),
paths_msg,
risotto_context,
'Info',
arguments)
arguments,
)
async def start(self,
risotto_context: Context,
arguments: dict,
msg: str,
) -> None:
paths_msg = self._get_message_paths(risotto_context)
if get_config()['global']['debug']:
if risotto_context.context_id != None:
context = f'({risotto_context.context_id})'
else:
context = ''
print(_(f'{risotto_context.username}: START{context}:{paths_msg}: {msg}'))
await self.insert(msg,
self._get_last_uri(risotto_context),
paths_msg,
risotto_context,
'Start',
arguments,
start=True,
)
async def success(self,
risotto_context: Context,
returns: Optional[dict]=None,
) -> None:
if get_config()['global']['debug']:
paths_msg = self._get_message_paths(risotto_context)
print(_(f'{risotto_context.username}: SUCCESS({risotto_context.context_id}):{paths_msg}'))
sql = """UPDATE RisottoLog
SET StopDate = $2,
Level = 'SUCCESS'
"""
args = [datetime.now()]
if returns:
sql += """, Returns = $3
"""
args.append(dumps(returns))
sql += """WHERE LogId = $1
"""
await risotto_context.log_connection.execute(sql,
risotto_context.start_id,
*args,
)
async def failed(self,
risotto_context: Context,
err: str,
) -> None:
if get_config()['global']['debug']:
paths_msg = self._get_message_paths(risotto_context)
if risotto_context.context_id != None:
context = f'({risotto_context.context_id})'
else:
context = ''
print(_(f'{risotto_context.username}: FAILED({risotto_context.context_id}):{paths_msg}: err'))
sql = """UPDATE RisottoLog
SET StopDate = $2,
Level = 'FAILED',
Msg = $3
WHERE LogId = $1
"""
await risotto_context.log_connection.execute(sql,
risotto_context.start_id,
datetime.now(),
err,
)
async def info(self,
risotto_context,
msg):
msg,
):
if get_config()['global']['debug']:
print(msg)
await self.insert(msg,
'',
None,
risotto_context,
'Info')
'Info',
)
log = Logger()

View File

@ -7,6 +7,7 @@ from typing import Callable, Optional, List
from asyncpg import create_pool
from json import dumps, loads
from pkg_resources import iter_entry_points
from traceback import print_exc
import risotto
from .utils import _
from .error import RegistrationError
@ -23,7 +24,7 @@ class Services():
def load_services(self):
for entry_point in iter_entry_points(group='risotto_services'):
self.services.setdefault(entry_point.name, [])
self.services.setdefault(entry_point.name, {})
self.services_loaded = True
def load_modules(self,
@ -32,21 +33,20 @@ class Services():
for entry_point in iter_entry_points(group='risotto_modules'):
service_name, module_name = entry_point.name.split('.')
if limit_services is None or service_name in limit_services:
setattr(self, module_name, entry_point.load())
self.services[service_name].append(module_name)
self.services[service_name][module_name] = entry_point.load()
self.modules_loaded = True
def get_services(self):
if not self.services_loaded:
self.load_services()
return [(s, getattr(self, s)) for s in self.services]
#
# def get_services(self):
# if not self.services_loaded:
# self.load_services()
# return [(service, getattr(self, service)) for service in self.services]
def get_modules(self,
limit_services: Optional[List[str]]=None,
) -> List[str]:
if not self.modules_loaded:
self.load_modules(limit_services=limit_services)
return [(module + '.' + submodule, getattr(self, submodule)) for module, submodules in self.services.items() for submodule in submodules]
return [(module + '.' + submodule, entry_point) for module, submodules in self.services.items() for submodule, entry_point in submodules.items()]
def get_services_list(self):
return self.services.keys()
@ -84,11 +84,11 @@ def register(uris: str,
def decorator(function):
for uri in uris:
version, message = uri.split('.', 1)
dispatcher.set_function(version,
message,
dispatcher.set_function(uri,
notification,
function)
function,
function.__module__
)
return decorator
@ -185,21 +185,21 @@ class RegisterDispatcher:
raise RegistrationError(_(f'error with {module_name}.{function_name} arguments: {msg}'))
def set_function(self,
version: str,
message: str,
uri: str,
notification: str,
function: Callable,
full_module_name: str,
):
""" register a function to an URI
URI is a message
"""
version, message = uri.split('.', 1)
# check if message exists
if message not in self.messages[version]:
raise RegistrationError(_(f'the message {message} not exists'))
# xxx submodule can only be register with v1.yyy.xxx..... message
risotto_module_name, submodule_name = function.__module__.split('.')[-3:-1]
risotto_module_name, submodule_name = full_module_name.split('.')[-3:-1]
module_name = risotto_module_name.split('_')[-1]
message_module, message_submodule, message_name = message.split('.', 2)
if message_module not in self.risotto_modules:
@ -214,7 +214,7 @@ class RegisterDispatcher:
# check if already register
if 'function' in self.messages[version][message]:
raise RegistrationError(_(f'uri {version}.{message} already registered'))
raise RegistrationError(_(f'uri {uri} already registered'))
# register
if self.messages[version][message]['pattern'] == 'rpc':
@ -224,6 +224,7 @@ class RegisterDispatcher:
register(version,
message,
f'{module_name}.{submodule_name}',
full_module_name,
function,
function_args,
notification,
@ -233,11 +234,13 @@ class RegisterDispatcher:
version: str,
message: str,
module_name: str,
full_module_name: str,
function: Callable,
function_args: list,
notification: Optional[str],
):
self.messages[version][message]['module'] = module_name
self.messages[version][message]['full_module_name'] = full_module_name
self.messages[version][message]['function'] = function
self.messages[version][message]['arguments'] = function_args
if notification:
@ -247,6 +250,7 @@ class RegisterDispatcher:
version: str,
message: str,
module_name: str,
full_module_name: str,
function: Callable,
function_args: list,
notification: Optional[str],
@ -255,8 +259,10 @@ class RegisterDispatcher:
self.messages[version][message]['functions'] = []
dico = {'module': module_name,
'full_module_name': full_module_name,
'function': function,
'arguments': function_args}
'arguments': function_args,
}
if notification and notification:
dico['notification'] = notification
self.messages[version][message]['functions'].append(dico)
@ -271,7 +277,7 @@ class RegisterDispatcher:
try:
self.injected_self[submodule_name] = module.Risotto(test)
except AttributeError as err:
raise RegistrationError(_(f'unable to register the module {submodule_name}, this module must have Risotto class'))
print(_(f'unable to register the module {submodule_name}, this module must have Risotto class'))
def validate(self):
""" check if all messages have a function
@ -281,9 +287,9 @@ class RegisterDispatcher:
for message, message_obj in messages.items():
if not 'functions' in message_obj and not 'function' in message_obj:
if message_obj['pattern'] == 'event':
print(f'{message} prêche dans le désert')
print(f'{version}.{message} prêche dans le désert')
else:
missing_messages.append(message)
missing_messages.append(f'{version}.{message}')
if missing_messages:
raise RegistrationError(_(f'no matching function for uri {missing_messages}'))
@ -291,29 +297,38 @@ class RegisterDispatcher:
truncate: bool=False,
) -> None:
internal_user = get_config()['global']['internal_user']
async with self.pool.acquire() as connection:
await connection.set_type_codec(
'json',
encoder=dumps,
decoder=loads,
schema='pg_catalog'
)
if truncate:
async with connection.transaction():
await connection.execute('TRUNCATE InfraServer, InfraSite, InfraZone, Log, ProviderDeployment, ProviderFactoryCluster, ProviderFactoryClusterNode, SettingApplicationservice, SettingApplicationServiceDependency, SettingRelease, SettingServer, SettingServermodel, SettingSource, UserRole, UserRoleURI, UserURI, UserUser, InfraServermodel, ProviderZone, ProviderServer, ProviderSource, ProviderApplicationservice ProviderServermodel')
async with connection.transaction():
for submodule_name, module in self.injected_self.items():
risotto_context = Context()
risotto_context.username = internal_user
risotto_context.paths.append(f'internal.{submodule_name}.on_join')
risotto_context.type = None
risotto_context.connection = connection
risotto_context.module = submodule_name.split('.', 1)[0]
info_msg = _(f'in module risotto_{submodule_name}.on_join')
await log.info_msg(risotto_context,
None,
info_msg)
await module.on_join(risotto_context)
async with self.pool.acquire() as log_connection:
async with log_connection.transaction():
async with self.pool.acquire() as connection:
await connection.set_type_codec(
'json',
encoder=dumps,
decoder=loads,
schema='pg_catalog'
)
if truncate:
async with connection.transaction():
await connection.execute('TRUNCATE InfraServer, InfraSite, InfraZone, Log, ProviderDeployment, ProviderFactoryCluster, ProviderFactoryClusterNode, SettingApplicationservice, SettingApplicationServiceDependency, SettingRelease, SettingServer, SettingServermodel, SettingSource, UserRole, UserRoleURI, UserURI, UserUser, InfraServermodel, ProviderZone, ProviderServer, ProviderSource, ProviderApplicationservice, ProviderServermodel')
async with connection.transaction():
for submodule_name, module in self.injected_self.items():
risotto_context = Context()
risotto_context.username = internal_user
risotto_context.paths.append(f'internal.{submodule_name}.on_join')
risotto_context.type = None
risotto_context.log_connection = log_connection
risotto_context.connection = connection
risotto_context.module = submodule_name.split('.', 1)[0]
info_msg = _(f'in function risotto_{submodule_name}.on_join')
await log.info_msg(risotto_context,
None,
info_msg)
try:
await module.on_join(risotto_context)
except Exception as err:
if get_config()['global']['debug']:
print_exc()
msg = _(f'on_join returns an error in module {submodule_name}: {err}')
await log.error_msg(risotto_context, {}, msg)
async def load(self):
# valid function's arguments

View File

@ -1,42 +0,0 @@
from asyncio import get_event_loop, ensure_future
from json import loads
from .context import Context
from .config import get_config
from .utils import _
class Remote:
async def register_remote(self) -> None:
print()
print(_('======== Registered remote event ========'))
self.listened_connection = await self.pool.acquire()
for version, messages in self.messages.items():
for message, message_infos in messages.items():
# event not emit locally
if message_infos['pattern'] == 'event':
module, submodule, submessage = message.split('.', 2)
if f'{module}.{submodule}' not in self.injected_self:
uri = f'{version}.{message}'
print(f' - {uri}')
await self.listened_connection.add_listener(uri, self.to_async_publish)
def to_async_publish(self,
con: 'asyncpg.connection.Connection',
pid: int,
uri: str,
payload: str,
) -> None:
version, message = uri.split('.', 1)
loop = get_event_loop()
remote_kw = loads(payload)
context = Context()
for key, value in remote_kw['context'].items():
setattr(context, key, value)
callback = lambda: ensure_future(self.publish(version,
message,
context,
**remote_kw['kwargs'],
))
loop.call_soon(callback)

View File

@ -1,9 +1,27 @@
class Undefined:
pass
undefined = Undefined()
def _(s):
return s
undefined = Undefined()
def tiramisu_display_name(kls,
dyn_name: 'Base'=None,
suffix: str=None,
) -> str:
if dyn_name is not None:
name = dyn_name
else:
name = kls.impl_getname()
doc = kls.impl_get_information('doc', None)
if doc:
doc = str(doc)
if doc.endswith('.'):
doc = doc[:-1]
if suffix:
doc += suffix
if name != doc:
name += f' ({doc})'
return name

View File

@ -392,7 +392,6 @@ async def test_server_created_base():
release_distribution='last',
site_name='site_1',
zones_name=['zones'],
zones_ip=['1.1.1.1'],
)
assert list(config_module.server) == [server_name]
assert set(config_module.server[server_name]) == {'server', 'server_to_deploy', 'funcs_file'}
@ -420,7 +419,6 @@ async def test_server_created_own_sm():
release_distribution='last',
site_name='site_1',
zones_name=['zones'],
zones_ip=['1.1.1.1'],
)
assert list(config_module.server) == [server_name]
assert set(config_module.server[server_name]) == {'server', 'server_to_deploy', 'funcs_file'}
@ -469,7 +467,6 @@ async def test_server_configuration_get():
release_distribution='last',
site_name='site_1',
zones_name=['zones'],
zones_ip=['1.1.1.1'],
)
#
await config_module.server[server_name]['server'].property.read_write()
@ -515,7 +512,6 @@ async def test_server_configuration_deployed():
release_distribution='last',
site_name='site_1',
zones_name=['zones'],
zones_ip=['1.1.1.1'],
)
#
await config_module.server[server_name]['server'].property.read_write()