Merge branch 'develop' into dist/risotto/risotto-2.8.0/develop

This commit is contained in:
Emmanuel Garette 2020-09-16 17:38:04 +02:00
commit be97d757d9
4 changed files with 165 additions and 170 deletions

View File

@ -39,9 +39,9 @@ class Controller:
**kwargs,
):
""" a wrapper to dispatcher's publish"""
version, message = uri.split('.', 1)
if args:
raise ValueError(_(f'the URI "{uri}" can only be published with keyword arguments'))
version, message = uri.split('.', 1)
await dispatcher.publish(version,
message,
risotto_context,

View File

@ -4,6 +4,7 @@ try:
except:
from tiramisu import Config
from tiramisu.error import ValueOptionError
from asyncio import get_event_loop, ensure_future
from traceback import print_exc
from copy import copy
from typing import Dict, Callable, List, Optional
@ -15,7 +16,6 @@ from .logger import log
from .config import get_config
from .context import Context
from . import register
from .remote import Remote
class CallDispatcher:
@ -79,7 +79,7 @@ class CallDispatcher:
""" execute the function associate with specified uri
arguments are validate before
"""
risotto_context = self.build_new_context(old_risotto_context,
risotto_context = self.build_new_context(old_risotto_context.__dict__,
version,
message,
'rpc',
@ -88,20 +88,35 @@ class CallDispatcher:
raise CallError(_(f'cannot find version of message "{version}"'))
if message not in self.messages[version]:
raise CallError(_(f'cannot find message "{version}.{message}"'))
function_objs = [self.messages[version][message]]
function_obj = self.messages[version][message]
# do not start a new database connection
if hasattr(old_risotto_context, 'connection'):
risotto_context.connection = old_risotto_context.connection
return await self.launch(version,
message,
risotto_context,
check_role,
await self.check_message_type(risotto_context,
kwargs,
)
config_arguments = await self.load_kwargs_to_config(risotto_context,
f'{version}.{message}',
kwargs,
check_role,
internal,
)
return await self.launch(risotto_context,
kwargs,
function_objs,
internal,
config_arguments,
function_obj,
)
else:
try:
await self.check_message_type(risotto_context,
kwargs,
)
config_arguments = await self.load_kwargs_to_config(risotto_context,
f'{version}.{message}',
kwargs,
check_role,
internal,
)
async with self.pool.acquire() as connection:
await connection.set_type_codec(
'json',
@ -111,13 +126,10 @@ class CallDispatcher:
)
risotto_context.connection = connection
async with connection.transaction():
return await self.launch(version,
message,
risotto_context,
check_role,
return await self.launch(risotto_context,
kwargs,
function_objs,
internal,
config_arguments,
function_obj,
)
except CallError as err:
raise err
@ -139,66 +151,81 @@ class CallDispatcher:
class PublishDispatcher:
async def register_remote(self) -> None:
print()
print(_('======== Registered remote event ========'))
self.listened_connection = await self.pool.acquire()
for version, messages in self.messages.items():
for message, message_infos in messages.items():
# event not emit locally
if message_infos['pattern'] == 'event' and 'functions' in message_infos and message_infos['functions']:
# module, submodule, submessage = message.split('.', 2)
# if f'{module}.{submodule}' not in self.injected_self:
uri = f'{version}.{message}'
print(f' - {uri}')
await self.listened_connection.add_listener(uri,
self.to_async_publish,
)
async def publish(self,
version: str,
message: str,
old_risotto_context: Context,
check_role: bool=False,
internal: bool=True,
risotto_context: Context,
**kwargs,
) -> None:
risotto_context = self.build_new_context(old_risotto_context,
if version not in self.messages or message not in self.messages[version]:
raise ValueError(_(f'cannot find URI "{version}.{message}"'))
# publish to remote
remote_kw = dumps({'kwargs': kwargs,
'context': {'username': risotto_context.username,
'paths': risotto_context.paths,
}
})
# FIXME should be better :/
remote_kw = remote_kw.replace("'", "''")
await risotto_context.connection.execute(f'NOTIFY "{version}.{message}", \'{remote_kw}\'')
def to_async_publish(self,
con: 'asyncpg.connection.Connection',
pid: int,
uri: str,
payload: str,
) -> None:
version, message = uri.split('.', 1)
loop = get_event_loop()
remote_kw = loads(payload)
risotto_context = self.build_new_context(remote_kw['context'],
version,
message,
'event',
)
try:
function_objs = self.messages[version][message].get('functions', [])
except KeyError:
raise ValueError(_(f'cannot find message {version}.{message}'))
# do not start a new database connection
if hasattr(old_risotto_context, 'connection'):
# publish to remove
remote_kw = dumps({'kwargs': kwargs,
'context': risotto_context.__dict__,
})
risotto_context.connection = old_risotto_context.connection
# FIXME should be better :/
remote_kw = remote_kw.replace("'", "''")
await risotto_context.connection.execute(f'NOTIFY "{version}.{message}", \'{remote_kw}\'')
return await self.launch(version,
message,
risotto_context,
check_role,
kwargs,
function_objs,
internal,
)
async with self.pool.acquire() as connection:
try:
await connection.set_type_codec(
'json',
encoder=dumps,
decoder=loads,
schema='pg_catalog'
)
risotto_context.connection = connection
async with connection.transaction():
return await self.launch(version,
message,
risotto_context,
check_role,
kwargs,
function_objs,
internal,
)
except CallError as err:
pass
except Exception as err:
# if there is a problem with arguments, log and do nothing
if get_config()['global']['debug']:
print_exc()
async with self.pool.acquire() as connection:
callback = lambda: ensure_future(self._publish(version,
message,
risotto_context,
**remote_kw['kwargs'],
))
loop.call_soon(callback)
async def _publish(self,
version: str,
message: str,
risotto_context: Context,
**kwargs,
) -> None:
config_arguments = await self.load_kwargs_to_config(risotto_context,
f'{version}.{message}',
kwargs,
False,
False,
)
for function_obj in self.messages[version][message]['functions']:
print('======', function_obj['function'].__name__)
async with self.pool.acquire() as connection:
try:
await self.check_message_type(risotto_context,
kwargs,
)
await connection.set_type_codec(
'json',
encoder=dumps,
@ -207,18 +234,37 @@ class PublishDispatcher:
)
risotto_context.connection = connection
async with connection.transaction():
await log.error_msg(risotto_context, kwargs, err)
await self.launch(risotto_context,
kwargs,
config_arguments,
function_obj,
)
except CallError as err:
pass
except Exception as err:
# if there is a problem with arguments, log and do nothing
if get_config()['global']['debug']:
print_exc()
async with self.pool.acquire() as connection:
await connection.set_type_codec(
'json',
encoder=dumps,
decoder=loads,
schema='pg_catalog'
)
risotto_context.connection = connection
async with connection.transaction():
await log.error_msg(risotto_context, kwargs, err)
class Dispatcher(register.RegisterDispatcher,
Remote,
CallDispatcher,
PublishDispatcher):
""" Manage message (call or publish)
so launch a function when a message is called
"""
def build_new_context(self,
old_risotto_context: Context,
context: dict,
version: str,
message: str,
type: str,
@ -227,8 +273,8 @@ class Dispatcher(register.RegisterDispatcher,
"""
uri = version + '.' + message
risotto_context = Context()
risotto_context.username = old_risotto_context.username
risotto_context.paths = copy(old_risotto_context.paths)
risotto_context.username = context['username']
risotto_context.paths = copy(context['paths'])
risotto_context.paths.append(uri)
risotto_context.uri = uri
risotto_context.type = type
@ -342,65 +388,56 @@ class Dispatcher(register.RegisterDispatcher,
raise NotAllowedError(_(f'You ({user_login}) don\'t have any authorisation to access to "{uri}"'))
async def launch(self,
version: str,
message: str,
risotto_context: Context,
check_role: bool,
kwargs: Dict,
function_objs: List,
internal: bool,
config_arguments: dict,
function_obj: Callable,
) -> Optional[Dict]:
await self.check_message_type(risotto_context,
kwargs)
config_arguments = await self.load_kwargs_to_config(risotto_context,
f'{version}.{message}',
kwargs,
check_role,
internal,
)
# config is ok, so send the message
for function_obj in function_objs:
function = function_obj['function']
submodule_name = function_obj['module']
function_name = function.__name__
risotto_context.module = submodule_name.split('.', 1)[0]
info_msg = _(f'in module {submodule_name}.{function_name}')
# build argument for this function
if risotto_context.type == 'rpc':
kw = config_arguments
else:
kw = {}
for key, value in config_arguments.items():
if key in function_obj['arguments']:
kw[key] = value
# so send the message
function = function_obj['function']
submodule_name = function_obj['module']
function_name = function.__name__
risotto_context.module = submodule_name.split('.', 1)[0]
info_msg = _(f'in module {submodule_name}.{function_name}')
# build argument for this function
if risotto_context.type == 'rpc':
kw = config_arguments
else:
kw = {}
for key, value in config_arguments.items():
if key in function_obj['arguments']:
kw[key] = value
kw['risotto_context'] = risotto_context
returns = await function(self.injected_self[function_obj['module']], **kw)
if risotto_context.type == 'rpc':
# valid returns
await self.valid_call_returns(risotto_context,
function,
returns,
kwargs)
# log the success
await log.info_msg(risotto_context,
{'arguments': kwargs,
'returns': returns},
info_msg)
# notification
if function_obj.get('notification'):
notif_version, notif_message = function_obj['notification'].split('.', 1)
if not isinstance(returns, list):
send_returns = [returns]
else:
send_returns = returns
for ret in send_returns:
await self.publish(notif_version,
notif_message,
risotto_context,
**ret)
if risotto_context.type == 'rpc':
return returns
kw['risotto_context'] = risotto_context
returns = await function(self.injected_self[function_obj['module']], **kw)
if risotto_context.type == 'rpc':
# valid returns
await self.valid_call_returns(risotto_context,
function,
returns,
kwargs,
)
# log the success
await log.info_msg(risotto_context,
{'arguments': kwargs,
'returns': returns},
info_msg,
)
# notification
if function_obj.get('notification'):
notif_version, notif_message = function_obj['notification'].split('.', 1)
if not isinstance(returns, list):
send_returns = [returns]
else:
send_returns = returns
for ret in send_returns:
await self.publish(notif_version,
notif_message,
risotto_context,
**ret,
)
if risotto_context.type == 'rpc':
return returns
dispatcher = Dispatcher()

View File

@ -300,7 +300,7 @@ class RegisterDispatcher:
)
if truncate:
async with connection.transaction():
await connection.execute('TRUNCATE InfraServer, InfraSite, InfraZone, Log, ProviderDeployment, ProviderFactoryCluster, ProviderFactoryClusterNode, SettingApplicationservice, SettingApplicationServiceDependency, SettingRelease, SettingServer, SettingServermodel, SettingSource, UserRole, UserRoleURI, UserURI, UserUser, InfraServermodel, ProviderZone, ProviderServer, ProviderSource, ProviderApplicationservice ProviderServermodel')
await connection.execute('TRUNCATE InfraServer, InfraSite, InfraZone, Log, ProviderDeployment, ProviderFactoryCluster, ProviderFactoryClusterNode, SettingApplicationservice, SettingApplicationServiceDependency, SettingRelease, SettingServer, SettingServermodel, SettingSource, UserRole, UserRoleURI, UserURI, UserUser, InfraServermodel, ProviderZone, ProviderServer, ProviderSource, ProviderApplicationservice, ProviderServermodel')
async with connection.transaction():
for submodule_name, module in self.injected_self.items():
risotto_context = Context()

View File

@ -1,42 +0,0 @@
from asyncio import get_event_loop, ensure_future
from json import loads
from .context import Context
from .config import get_config
from .utils import _
class Remote:
async def register_remote(self) -> None:
print()
print(_('======== Registered remote event ========'))
self.listened_connection = await self.pool.acquire()
for version, messages in self.messages.items():
for message, message_infos in messages.items():
# event not emit locally
if message_infos['pattern'] == 'event':
module, submodule, submessage = message.split('.', 2)
if f'{module}.{submodule}' not in self.injected_self:
uri = f'{version}.{message}'
print(f' - {uri}')
await self.listened_connection.add_listener(uri, self.to_async_publish)
def to_async_publish(self,
con: 'asyncpg.connection.Connection',
pid: int,
uri: str,
payload: str,
) -> None:
version, message = uri.split('.', 1)
loop = get_event_loop()
remote_kw = loads(payload)
context = Context()
for key, value in remote_kw['context'].items():
setattr(context, key, value)
callback = lambda: ensure_future(self.publish(version,
message,
context,
**remote_kw['kwargs'],
))
loop.call_soon(callback)