Compare commits

..

11 Commits

11 changed files with 214 additions and 310 deletions

5
debian/changelog vendored
View File

@ -1,5 +0,0 @@
risotto (0.1) unstable; urgency=low
* first version
-- Cadoles <contact@cadoles.com> Fri, 20 Mar 2020 15:18:25 +0100

10
debian/control vendored
View File

@ -2,13 +2,19 @@ Source: risotto
Section: admin
Priority: extra
Maintainer: Cadoles <contact@cadoles.com>
Build-depends: debhelper (>=11), python3-all, python3-setuptools
Build-depends: debhelper (>=11), python3-all, python3-setuptools, dh-python
Standards-Version: 3.9.4
Homepage: https://forge.cadoles.com/Infra/risotto
Package: python3-risotto
Architecture: any
Pre-Depends: dpkg, python3, ${misc:Pre-Depends}
Depends: ${python:Depends}, ${misc:Depends}, python3-asyncpg, python3-rougail, python3-aiohttp
Description: configuration manager libraries
Package: risotto
Architecture: any
Pre-Depends: dpkg, python3, ${misc:Pre-Depends}
Depends: ${python:Depends}, ${misc:Depends}
Depends: ${python:Depends}, ${misc:Depends}, python3-risotto
Description: configuration manager

2
debian/risotto.install vendored Normal file
View File

@ -0,0 +1,2 @@
script/risotto-server usr/bin/
sql/risotto.sql usr/share/eole/db/eole-risotto/gen/

5
script/risotto-server Normal file → Executable file
View File

@ -1,16 +1,13 @@
#!/usr/bin/env python3
from sdnotify import SystemdNotifier
from asyncio import get_event_loop
from risotto import get_app
if __name__ == '__main__':
notifier = SystemdNotifier()
loop = get_event_loop()
loop.run_until_complete(get_app(loop))
print('HTTP server ready')
notifier.notify("READY=1")
try:
print('HTTP server ready')
loop.run_forever()
except KeyboardInterrupt:
pass

View File

@ -1,7 +1,6 @@
from os import environ
from os.path import isfile
from configobj import ConfigObj
from uuid import uuid4
CONFIG_FILE = environ.get('CONFIG_FILE', '/etc/risotto/risotto.conf')
@ -21,6 +20,10 @@ if 'CONFIGURATION_DIR' in environ:
CONFIGURATION_DIR = environ['CONFIGURATION_DIR']
else:
CONFIGURATION_DIR = config.get('CONFIGURATION_DIR', '/srv/risotto/configurations')
if 'PROVIDER_FACTORY_CONFIG_DIR' in environ:
PROVIDER_FACTORY_CONFIG_DIR = environ['PROVIDER_FACTORY_CONFIG_DIR']
else:
PROVIDER_FACTORY_CONFIG_DIR = config.get('PROVIDER_FACTORY_CONFIG_DIR', '/srv/factory')
if 'DEFAULT_USER' in environ:
DEFAULT_USER = environ['DEFAULT_USER']
else:
@ -49,18 +52,6 @@ if 'TIRAMISU_DB_USER' in environ:
TIRAMISU_DB_USER = environ['TIRAMISU_DB_USER']
else:
TIRAMISU_DB_USER = config.get('TIRAMISU_DB_USER', 'tiramisu')
if 'CELERYRISOTTO_DB_NAME' in environ:
CELERYRISOTTO_DB_NAME = environ['CELERYRISOTTO_DB_NAME']
else:
CELERYRISOTTO_DB_NAME = config.get('CELERYRISOTTO_DB_NAME', None)
if 'CELERYRISOTTO_DB_PASSWORD' in environ:
CELERYRISOTTO_DB_PASSWORD = environ['CELERYRISOTTO_DB_PASSWORD']
else:
CELERYRISOTTO_DB_PASSWORD = config.get('CELERYRISOTTO_DB_PASSWORD', None)
if 'CELERYRISOTTO_DB_USER' in environ:
CELERYRISOTTO_DB_USER = environ['CELERYRISOTTO_DB_USER']
else:
CELERYRISOTTO_DB_USER = config.get('CELERYRISOTTO_DB_USER', None)
if 'DB_ADDRESS' in environ:
DB_ADDRESS = environ['DB_ADDRESS']
else:
@ -85,44 +76,6 @@ if 'TMP_DIR' in environ:
TMP_DIR = environ['TMP_DIR']
else:
TMP_DIR = config.get('TMP_DIR', '/tmp')
if 'IMAGE_PATH' in environ:
IMAGE_PATH = environ['IMAGE_PATH']
else:
IMAGE_PATH = config.get('IMAGE_PATH', '/tmp')
if 'PASSWORD_ADMIN_USERNAME' in environ:
PASSWORD_ADMIN_USERNAME = environ['PASSWORD_ADMIN_USERNAME']
else:
PASSWORD_ADMIN_USERNAME = config.get('PASSWORD_ADMIN_USERNAME', 'risotto')
if 'PASSWORD_ADMIN_EMAIL' in environ:
PASSWORD_ADMIN_EMAIL = environ['PASSWORD_ADMIN_EMAIL']
else:
# this parameter is mandatory
PASSWORD_ADMIN_EMAIL = config['PASSWORD_ADMIN_EMAIL']
if 'PASSWORD_ADMIN_PASSWORD' in environ:
PASSWORD_ADMIN_PASSWORD = environ['PASSWORD_ADMIN_PASSWORD']
else:
# this parameter is mandatory
PASSWORD_ADMIN_PASSWORD = config['PASSWORD_ADMIN_PASSWORD']
if 'PASSWORD_DEVICE_IDENTIFIER' in environ:
PASSWORD_DEVICE_IDENTIFIER = environ['PASSWORD_DEVICE_IDENTIFIER']
else:
PASSWORD_DEVICE_IDENTIFIER = config.get('PASSWORD_DEVICE_IDENTIFIER', uuid4())
if 'PASSWORD_URL' in environ:
PASSWORD_URL = environ['PASSWORD_URL']
else:
PASSWORD_URL = config.get('PASSWORD_URL', 'https://localhost:8001/')
if 'PKI_ADMIN_PASSWORD' in environ:
PKI_ADMIN_PASSWORD = environ['PKI_ADMIN_PASSWORD']
else:
PKI_ADMIN_PASSWORD = config['PKI_ADMIN_PASSWORD']
if 'PKI_ADMIN_EMAIL' in environ:
PKI_ADMIN_EMAIL = environ['PKI_ADMIN_EMAIL']
else:
PKI_ADMIN_EMAIL = config['PKI_ADMIN_EMAIL']
if 'PKI_URL' in environ:
PKI_URL = environ['PKI_URL']
else:
PKI_URL = config.get('PKI_URL', 'http://localhost:8002')
def dsn_factory(database, user, password, address=DB_ADDRESS):
@ -132,7 +85,6 @@ def dsn_factory(database, user, password, address=DB_ADDRESS):
_config = {'database': {'dsn': dsn_factory(RISOTTO_DB_NAME, RISOTTO_DB_USER, RISOTTO_DB_PASSWORD),
'tiramisu_dsn': dsn_factory(TIRAMISU_DB_NAME, TIRAMISU_DB_USER, TIRAMISU_DB_PASSWORD),
'celery_dsn': dsn_factory(CELERYRISOTTO_DB_NAME, CELERYRISOTTO_DB_USER, CELERYRISOTTO_DB_PASSWORD)
},
'http_server': {'port': RISOTTO_PORT,
'default_user': DEFAULT_USER},
@ -145,24 +97,13 @@ _config = {'database': {'dsn': dsn_factory(RISOTTO_DB_NAME, RISOTTO_DB_USER, RIS
'sql_dir': SQL_DIR,
'tmp_dir': TMP_DIR,
},
'password': {'admin_username': PASSWORD_ADMIN_USERNAME,
'admin_email': PASSWORD_ADMIN_EMAIL,
'admin_password': PASSWORD_ADMIN_PASSWORD,
'device_identifier': PASSWORD_DEVICE_IDENTIFIER,
'service_url': PASSWORD_URL,
},
'pki': {'admin_password': PKI_ADMIN_PASSWORD,
'owner': PKI_ADMIN_EMAIL,
'url': PKI_URL,
},
'cache': {'root_path': CACHE_ROOT_PATH},
'servermodel': {'internal_source_path': SRV_SEED_PATH,
'internal_source': 'internal'},
'submodule': {'allow_insecure_https': False,
'pki': '192.168.56.112'},
'provider': {'factory_configuration_filename': 'infra.json',
'packer_filename': 'recipe.json',
'risotto_images_dir': IMAGE_PATH},
'provider': {'factory_configuration_dir': PROVIDER_FACTORY_CONFIG_DIR,
'factory_configuration_filename': 'infra.json'},
}

View File

@ -39,40 +39,15 @@ class Controller:
**kwargs,
):
""" a wrapper to dispatcher's publish"""
version, message = uri.split('.', 1)
if args:
raise ValueError(_(f'the URI "{uri}" can only be published with keyword arguments'))
version, message = uri.split('.', 1)
await dispatcher.publish(version,
message,
risotto_context,
**kwargs,
)
@staticmethod
async def check_role(self,
uri: str,
username: str,
**kwargs: dict,
) -> None:
# create a new config
async with await Config(dispatcher.option) as config:
await config.property.read_write()
await config.option('message').value.set(uri)
subconfig = config.option(uri)
for key, value in kwargs.items():
try:
await subconfig.option(key).value.set(value)
except AttributeError:
if get_config()['global']['debug']:
print_exc()
raise ValueError(_(f'unknown parameter in "{uri}": "{key}"'))
except ValueOptionError as err:
raise ValueError(_(f'invalid parameter in "{uri}": {err}'))
await dispatcher.check_role(subconfig,
username,
uri,
)
async def on_join(self,
risotto_context,
):

View File

@ -4,7 +4,6 @@ try:
except:
from tiramisu import Config
from tiramisu.error import ValueOptionError
from asyncio import get_event_loop, ensure_future
from traceback import print_exc
from copy import copy
from typing import Dict, Callable, List, Optional
@ -16,6 +15,7 @@ from .logger import log
from .config import get_config
from .context import Context
from . import register
from .remote import Remote
class CallDispatcher:
@ -79,7 +79,7 @@ class CallDispatcher:
""" execute the function associate with specified uri
arguments are validate before
"""
risotto_context = self.build_new_context(old_risotto_context.__dict__,
risotto_context = self.build_new_context(old_risotto_context,
version,
message,
'rpc',
@ -88,35 +88,20 @@ class CallDispatcher:
raise CallError(_(f'cannot find version of message "{version}"'))
if message not in self.messages[version]:
raise CallError(_(f'cannot find message "{version}.{message}"'))
function_obj = self.messages[version][message]
function_objs = [self.messages[version][message]]
# do not start a new database connection
if hasattr(old_risotto_context, 'connection'):
risotto_context.connection = old_risotto_context.connection
await self.check_message_type(risotto_context,
kwargs,
)
config_arguments = await self.load_kwargs_to_config(risotto_context,
f'{version}.{message}',
kwargs,
return await self.launch(version,
message,
risotto_context,
check_role,
internal,
)
return await self.launch(risotto_context,
kwargs,
config_arguments,
function_obj,
function_objs,
internal,
)
else:
try:
await self.check_message_type(risotto_context,
kwargs,
)
config_arguments = await self.load_kwargs_to_config(risotto_context,
f'{version}.{message}',
kwargs,
check_role,
internal,
)
async with self.pool.acquire() as connection:
await connection.set_type_codec(
'json',
@ -126,10 +111,13 @@ class CallDispatcher:
)
risotto_context.connection = connection
async with connection.transaction():
return await self.launch(risotto_context,
return await self.launch(version,
message,
risotto_context,
check_role,
kwargs,
config_arguments,
function_obj,
function_objs,
internal,
)
except CallError as err:
raise err
@ -151,80 +139,43 @@ class CallDispatcher:
class PublishDispatcher:
async def register_remote(self) -> None:
print()
print(_('======== Registered remote event ========'))
self.listened_connection = await self.pool.acquire()
for version, messages in self.messages.items():
for message, message_infos in messages.items():
# event not emit locally
if message_infos['pattern'] == 'event' and 'functions' in message_infos and message_infos['functions']:
# module, submodule, submessage = message.split('.', 2)
# if f'{module}.{submodule}' not in self.injected_self:
uri = f'{version}.{message}'
print(f' - {uri}')
await self.listened_connection.add_listener(uri,
self.to_async_publish,
)
async def publish(self,
version: str,
message: str,
risotto_context: Context,
old_risotto_context: Context,
check_role: bool=False,
internal: bool=True,
**kwargs,
) -> None:
if version not in self.messages or message not in self.messages[version]:
raise ValueError(_(f'cannot find URI "{version}.{message}"'))
# publish to remote
remote_kw = dumps({'kwargs': kwargs,
'context': {'username': risotto_context.username,
'paths': risotto_context.paths,
}
})
# FIXME should be better :/
remote_kw = remote_kw.replace("'", "''")
await risotto_context.connection.execute(f'NOTIFY "{version}.{message}", \'{remote_kw}\'')
def to_async_publish(self,
con: 'asyncpg.connection.Connection',
pid: int,
uri: str,
payload: str,
) -> None:
version, message = uri.split('.', 1)
loop = get_event_loop()
remote_kw = loads(payload)
risotto_context = self.build_new_context(remote_kw['context'],
risotto_context = self.build_new_context(old_risotto_context,
version,
message,
'event',
)
callback = lambda: ensure_future(self._publish(version,
try:
function_objs = self.messages[version][message].get('functions', [])
except KeyError:
raise ValueError(_(f'cannot find message {version}.{message}'))
# do not start a new database connection
if hasattr(old_risotto_context, 'connection'):
# publish to remove
remote_kw = dumps({'kwargs': kwargs,
'context': risotto_context.__dict__,
})
risotto_context.connection = old_risotto_context.connection
# FIXME should be better :/
remote_kw = remote_kw.replace("'", "''")
await risotto_context.connection.execute(f'NOTIFY "{version}.{message}", \'{remote_kw}\'')
return await self.launch(version,
message,
risotto_context,
**remote_kw['kwargs'],
))
loop.call_soon(callback)
async def _publish(self,
version: str,
message: str,
risotto_context: Context,
**kwargs,
) -> None:
config_arguments = await self.load_kwargs_to_config(risotto_context,
f'{version}.{message}',
check_role,
kwargs,
False,
False,
function_objs,
internal,
)
for function_obj in self.messages[version][message]['functions']:
async with self.pool.acquire() as connection:
try:
await self.check_message_type(risotto_context,
kwargs,
)
await connection.set_type_codec(
'json',
encoder=dumps,
@ -233,10 +184,13 @@ class PublishDispatcher:
)
risotto_context.connection = connection
async with connection.transaction():
await self.launch(risotto_context,
return await self.launch(version,
message,
risotto_context,
check_role,
kwargs,
config_arguments,
function_obj,
function_objs,
internal,
)
except CallError as err:
pass
@ -257,13 +211,14 @@ class PublishDispatcher:
class Dispatcher(register.RegisterDispatcher,
Remote,
CallDispatcher,
PublishDispatcher):
""" Manage message (call or publish)
so launch a function when a message is called
"""
def build_new_context(self,
context: dict,
old_risotto_context: Context,
version: str,
message: str,
type: str,
@ -272,8 +227,8 @@ class Dispatcher(register.RegisterDispatcher,
"""
uri = version + '.' + message
risotto_context = Context()
risotto_context.username = context['username']
risotto_context.paths = copy(context['paths'])
risotto_context.username = old_risotto_context.username
risotto_context.paths = copy(old_risotto_context.paths)
risotto_context.paths.append(uri)
risotto_context.uri = uri
risotto_context.type = type
@ -342,15 +297,14 @@ class Dispatcher(register.RegisterDispatcher,
async def check_role(self,
config: Config,
user_login: str,
uri: str,
) -> None:
uri: str) -> None:
async with self.pool.acquire() as connection:
async with connection.transaction():
# Verify if user exists and get ID
sql = '''
SELECT UserId
FROM UserUser
WHERE Login = $1
WHERE UserLogin = $1
'''
user_id = await connection.fetchval(sql,
user_login)
@ -388,16 +342,29 @@ class Dispatcher(register.RegisterDispatcher,
raise NotAllowedError(_(f'You ({user_login}) don\'t have any authorisation to access to "{uri}"'))
async def launch(self,
version: str,
message: str,
risotto_context: Context,
check_role: bool,
kwargs: Dict,
config_arguments: dict,
function_obj: Callable,
function_objs: List,
internal: bool,
) -> Optional[Dict]:
# so send the message
await self.check_message_type(risotto_context,
kwargs)
config_arguments = await self.load_kwargs_to_config(risotto_context,
f'{version}.{message}',
kwargs,
check_role,
internal,
)
# config is ok, so send the message
for function_obj in function_objs:
function = function_obj['function']
risotto_context.module = function_obj['module'].split('.', 1)[0]
submodule_name = function_obj['module']
function_name = function.__name__
info_msg = _(f"in function {function_obj['full_module_name']}.{function_name}")
risotto_context.module = submodule_name.split('.', 1)[0]
info_msg = _(f'in module {submodule_name}.{function_name}')
# build argument for this function
if risotto_context.type == 'rpc':
kw = config_arguments
@ -408,20 +375,18 @@ class Dispatcher(register.RegisterDispatcher,
kw[key] = value
kw['risotto_context'] = risotto_context
returns = await function(self.get_service(function_obj['module']), **kw)
returns = await function(self.injected_self[function_obj['module']], **kw)
if risotto_context.type == 'rpc':
# valid returns
await self.valid_call_returns(risotto_context,
function,
returns,
kwargs,
)
kwargs)
# log the success
await log.info_msg(risotto_context,
{'arguments': kwargs,
'returns': returns},
info_msg,
)
info_msg)
# notification
if function_obj.get('notification'):
notif_version, notif_message = function_obj['notification'].split('.', 1)
@ -433,8 +398,7 @@ class Dispatcher(register.RegisterDispatcher,
await self.publish(notif_version,
notif_message,
risotto_context,
**ret,
)
**ret)
if risotto_context.type == 'rpc':
return returns

View File

@ -29,8 +29,7 @@ def create_context(request):
def register(version: str,
path: str,
):
path: str):
""" Decorator to register function to the http route
"""
def decorator(function):
@ -42,9 +41,7 @@ def register(version: str,
class extra_route_handler:
async def __new__(cls,
request,
):
async def __new__(cls, request):
kwargs = dict(request.match_info)
kwargs['request'] = request
kwargs['risotto_context'] = create_context(request)
@ -99,13 +96,11 @@ async def handle(request):
print_exc()
raise HTTPInternalServerError(reason=str(err))
return Response(text=dumps({'response': text}),
content_type='application/json',
)
content_type='application/json')
async def api(request,
risotto_context,
):
risotto_context):
global TIRAMISU
if not TIRAMISU:
# check all URI that have an associated role
@ -157,8 +152,7 @@ async def get_app(loop):
for version in versions:
api_route = {'function': api,
'version': version,
'path': f'/api/{version}',
}
'path': f'/api/{version}'}
extra_handler = type(api_route['path'], (extra_route_handler,), api_route)
routes.append(get(api_route['path'], extra_handler))
print(f' - {api_route["path"]} (http_get)')
@ -180,10 +174,7 @@ async def get_app(loop):
await dispatcher.register_remote()
print()
await dispatcher.on_join()
return await loop.create_server(app.make_handler(),
'*',
get_config()['http_server']['port'],
)
return await loop.create_server(app.make_handler(), '*', get_config()['http_server']['port'])
TIRAMISU = None

View File

@ -7,7 +7,6 @@ from typing import Callable, Optional, List
from asyncpg import create_pool
from json import dumps, loads
from pkg_resources import iter_entry_points
from traceback import print_exc
import risotto
from .utils import _
from .error import RegistrationError
@ -24,7 +23,7 @@ class Services():
def load_services(self):
for entry_point in iter_entry_points(group='risotto_services'):
self.services.setdefault(entry_point.name, {})
self.services.setdefault(entry_point.name, [])
self.services_loaded = True
def load_modules(self,
@ -33,20 +32,21 @@ class Services():
for entry_point in iter_entry_points(group='risotto_modules'):
service_name, module_name = entry_point.name.split('.')
if limit_services is None or service_name in limit_services:
self.services[service_name][module_name] = entry_point.load()
setattr(self, module_name, entry_point.load())
self.services[service_name].append(module_name)
self.modules_loaded = True
#
# def get_services(self):
# if not self.services_loaded:
# self.load_services()
# return [(service, getattr(self, service)) for service in self.services]
def get_services(self):
if not self.services_loaded:
self.load_services()
return [(s, getattr(self, s)) for s in self.services]
def get_modules(self,
limit_services: Optional[List[str]]=None,
) -> List[str]:
if not self.modules_loaded:
self.load_modules(limit_services=limit_services)
return [(module + '.' + submodule, entry_point) for module, submodules in self.services.items() for submodule, entry_point in submodules.items()]
return [(module + '.' + submodule, getattr(self, submodule)) for module, submodules in self.services.items() for submodule in submodules]
def get_services_list(self):
return self.services.keys()
@ -199,8 +199,7 @@ class RegisterDispatcher:
raise RegistrationError(_(f'the message {message} not exists'))
# xxx submodule can only be register with v1.yyy.xxx..... message
full_module_name = function.__module__
risotto_module_name, submodule_name = full_module_name.split('.')[-3:-1]
risotto_module_name, submodule_name = function.__module__.split('.')[-3:-1]
module_name = risotto_module_name.split('_')[-1]
message_module, message_submodule, message_name = message.split('.', 2)
if message_module not in self.risotto_modules:
@ -225,7 +224,6 @@ class RegisterDispatcher:
register(version,
message,
f'{module_name}.{submodule_name}',
full_module_name,
function,
function_args,
notification,
@ -235,13 +233,11 @@ class RegisterDispatcher:
version: str,
message: str,
module_name: str,
full_module_name: str,
function: Callable,
function_args: list,
notification: Optional[str],
):
self.messages[version][message]['module'] = module_name
self.messages[version][message]['full_module_name'] = full_module_name
self.messages[version][message]['function'] = function
self.messages[version][message]['arguments'] = function_args
if notification:
@ -251,7 +247,6 @@ class RegisterDispatcher:
version: str,
message: str,
module_name: str,
full_module_name: str,
function: Callable,
function_args: list,
notification: Optional[str],
@ -260,10 +255,8 @@ class RegisterDispatcher:
self.messages[version][message]['functions'] = []
dico = {'module': module_name,
'full_module_name': full_module_name,
'function': function,
'arguments': function_args,
}
'arguments': function_args}
if notification and notification:
dico['notification'] = notification
self.messages[version][message]['functions'].append(dico)
@ -278,7 +271,7 @@ class RegisterDispatcher:
try:
self.injected_self[submodule_name] = module.Risotto(test)
except AttributeError as err:
print(_(f'unable to register the module {submodule_name}, this module must have Risotto class'))
raise RegistrationError(_(f'unable to register the module {submodule_name}, this module must have Risotto class'))
def validate(self):
""" check if all messages have a function
@ -307,7 +300,7 @@ class RegisterDispatcher:
)
if truncate:
async with connection.transaction():
await connection.execute('TRUNCATE InfraServer, InfraSite, InfraZone, Log, ProviderDeployment, ProviderFactoryCluster, ProviderFactoryClusterNode, SettingApplicationservice, SettingApplicationServiceDependency, SettingRelease, SettingServer, SettingServermodel, SettingSource, UserRole, UserRoleURI, UserURI, UserUser, InfraServermodel, ProviderZone, ProviderServer, ProviderSource, ProviderApplicationservice, ProviderServermodel')
await connection.execute('TRUNCATE InfraServer, InfraSite, InfraZone, Log, ProviderDeployment, ProviderFactoryCluster, ProviderFactoryClusterNode, SettingApplicationservice, SettingApplicationServiceDependency, SettingRelease, SettingServer, SettingServermodel, SettingSource, UserRole, UserRoleURI, UserURI, UserUser, InfraServermodel, ProviderZone, ProviderServer, ProviderSource, ProviderApplicationservice ProviderServermodel')
async with connection.transaction():
for submodule_name, module in self.injected_self.items():
risotto_context = Context()
@ -316,17 +309,11 @@ class RegisterDispatcher:
risotto_context.type = None
risotto_context.connection = connection
risotto_context.module = submodule_name.split('.', 1)[0]
info_msg = _(f'in function risotto_{submodule_name}.on_join')
info_msg = _(f'in module risotto_{submodule_name}.on_join')
await log.info_msg(risotto_context,
None,
info_msg)
try:
await module.on_join(risotto_context)
except Exception as err:
if get_config()['global']['debug']:
print_exc()
msg = _(f'on_join returns an error in module {submodule_name}: {err}')
await log.error_msg(risotto_context, {}, msg)
async def load(self):
# valid function's arguments

42
src/risotto/remote.py Normal file
View File

@ -0,0 +1,42 @@
from asyncio import get_event_loop, ensure_future
from json import loads
from .context import Context
from .config import get_config
from .utils import _
class Remote:
async def register_remote(self) -> None:
print()
print(_('======== Registered remote event ========'))
self.listened_connection = await self.pool.acquire()
for version, messages in self.messages.items():
for message, message_infos in messages.items():
# event not emit locally
if message_infos['pattern'] == 'event':
module, submodule, submessage = message.split('.', 2)
if f'{module}.{submodule}' not in self.injected_self:
uri = f'{version}.{message}'
print(f' - {uri}')
await self.listened_connection.add_listener(uri, self.to_async_publish)
def to_async_publish(self,
con: 'asyncpg.connection.Connection',
pid: int,
uri: str,
payload: str,
) -> None:
version, message = uri.split('.', 1)
loop = get_event_loop()
remote_kw = loads(payload)
context = Context()
for key, value in remote_kw['context'].items():
setattr(context, key, value)
callback = lambda: ensure_future(self.publish(version,
message,
context,
**remote_kw['kwargs'],
))
loop.call_soon(callback)

View File

@ -392,6 +392,7 @@ async def test_server_created_base():
release_distribution='last',
site_name='site_1',
zones_name=['zones'],
zones_ip=['1.1.1.1'],
)
assert list(config_module.server) == [server_name]
assert set(config_module.server[server_name]) == {'server', 'server_to_deploy', 'funcs_file'}
@ -419,6 +420,7 @@ async def test_server_created_own_sm():
release_distribution='last',
site_name='site_1',
zones_name=['zones'],
zones_ip=['1.1.1.1'],
)
assert list(config_module.server) == [server_name]
assert set(config_module.server[server_name]) == {'server', 'server_to_deploy', 'funcs_file'}
@ -467,6 +469,7 @@ async def test_server_configuration_get():
release_distribution='last',
site_name='site_1',
zones_name=['zones'],
zones_ip=['1.1.1.1'],
)
#
await config_module.server[server_name]['server'].property.read_write()
@ -512,6 +515,7 @@ async def test_server_configuration_deployed():
release_distribution='last',
site_name='site_1',
zones_name=['zones'],
zones_ip=['1.1.1.1'],
)
#
await config_module.server[server_name]['server'].property.read_write()