risotto/src/risotto/dispatcher.py

347 lines
15 KiB
Python
Raw Normal View History

2019-11-28 14:50:53 +01:00
from tiramisu import Config
from traceback import print_exc
from copy import copy
2019-12-28 12:29:11 +01:00
from typing import Dict, Callable, List, Optional
2019-12-19 15:00:24 +01:00
from json import dumps, loads
2019-11-28 14:50:53 +01:00
2019-12-02 10:29:40 +01:00
from .utils import _
from .error import CallError, NotAllowedError
2019-11-28 14:50:53 +01:00
from .logger import log
2019-12-02 10:45:07 +01:00
from .config import get_config
2019-11-28 14:50:53 +01:00
from .context import Context
2019-12-02 10:29:40 +01:00
from . import register
import asyncpg
2019-11-28 14:50:53 +01:00
2019-12-02 10:29:40 +01:00
class CallDispatcher:
2019-12-26 11:38:31 +01:00
async def valid_call_returns(self,
risotto_context: Context,
2019-12-28 12:29:11 +01:00
function,
2019-12-26 11:38:31 +01:00
returns: Dict,
kwargs: Dict):
2019-12-02 10:29:40 +01:00
response = self.messages[risotto_context.version][risotto_context.message]['response']
2019-12-28 12:29:11 +01:00
module_name = function.__module__.split('.')[-2]
function_name = function.__name__
2019-12-02 10:29:40 +01:00
if response.impl_get_information('multi'):
if not isinstance(returns, list):
err = _(f'function {module_name}.{function_name} has to return a list')
2019-12-28 12:29:11 +01:00
await log.error_msg(risotto_context, kwargs, err)
2019-12-02 10:29:40 +01:00
raise CallError(str(err))
else:
if not isinstance(returns, dict):
2019-12-28 12:29:11 +01:00
await log.error_msg(risotto_context, kwargs, returns)
2019-12-02 10:29:40 +01:00
err = _(f'function {module_name}.{function_name} has to return a dict')
2019-12-28 12:29:11 +01:00
await log.error_msg(risotto_context, kwargs, err)
2019-12-02 10:29:40 +01:00
raise CallError(str(err))
2019-11-29 16:38:33 +01:00
returns = [returns]
2019-11-29 14:20:03 +01:00
if response is None:
raise Exception('hu?')
else:
2019-11-29 16:38:33 +01:00
for ret in returns:
2019-12-26 11:38:31 +01:00
config = await Config(response,
display_name=lambda self, dyn_name: self.impl_getname())
await config.property.read_write()
2019-11-29 16:38:33 +01:00
try:
for key, value in ret.items():
2019-12-26 11:38:31 +01:00
await config.option(key).value.set(value)
2019-11-29 16:38:33 +01:00
except AttributeError:
err = _(f'function {module_name}.{function_name} return the unknown parameter "{key}"')
2019-12-28 12:29:11 +01:00
await log.error_msg(risotto_context, kwargs, err)
2019-11-29 16:38:33 +01:00
raise CallError(str(err))
except ValueError:
err = _(f'function {module_name}.{function_name} return the parameter "{key}" with an unvalid value "{value}"')
2019-12-28 12:29:11 +01:00
await log.error_msg(risotto_context, kwargs, err)
2019-11-29 16:38:33 +01:00
raise CallError(str(err))
2019-12-26 11:38:31 +01:00
await config.property.read_only()
mandatories = await config.value.mandatory()
2019-12-02 10:29:40 +01:00
if mandatories:
mand = [mand.split('.')[-1] for mand in mandatories]
2019-12-02 14:22:40 +01:00
raise ValueError(_(f'missing parameters in response: {mand} in message "{risotto_context.message}"'))
2019-11-29 16:38:33 +01:00
try:
2019-12-26 11:38:31 +01:00
await config.value.dict()
2019-11-29 16:38:33 +01:00
except Exception as err:
err = _(f'function {module_name}.{function_name} return an invalid response {err}')
2019-12-28 12:29:11 +01:00
await log.error_msg(risotto_context, kwargs, err)
2019-11-29 16:38:33 +01:00
raise CallError(str(err))
2019-11-29 14:20:03 +01:00
2019-12-02 10:29:40 +01:00
async def call(self,
version: str,
message: str,
old_risotto_context: Context,
2019-12-27 15:09:38 +01:00
check_role: bool=False,
2019-12-02 10:29:40 +01:00
**kwargs):
2019-11-28 14:50:53 +01:00
""" execute the function associate with specified uri
arguments are validate before
"""
2019-12-02 10:29:40 +01:00
risotto_context = self.build_new_context(old_risotto_context,
version,
message,
'rpc')
2019-12-28 12:29:11 +01:00
function_objs = [self.messages[version][message]]
# do not start a new database connection
if hasattr(old_risotto_context, 'connection'):
risotto_context.connection = old_risotto_context.connection
return await self.launch(version,
message,
risotto_context,
check_role,
kwargs,
function_objs)
else:
async with self.pool.acquire() as connection:
await connection.set_type_codec(
'json',
encoder=dumps,
decoder=loads,
schema='pg_catalog'
)
risotto_context.connection = connection
async with connection.transaction():
return await self.launch(version,
message,
risotto_context,
check_role,
kwargs,
function_objs)
2019-11-28 14:50:53 +01:00
2019-12-02 10:29:40 +01:00
class PublishDispatcher:
2019-12-27 15:09:38 +01:00
async def publish(self,
version: str,
message: str,
old_risotto_context: Context,
check_role: bool=False,
**kwargs) -> None:
2019-12-02 10:29:40 +01:00
risotto_context = self.build_new_context(old_risotto_context,
version,
message,
'event')
2019-12-28 12:29:11 +01:00
function_objs = self.messages[version][message].get('functions', [])
# do not start a new database connection
if hasattr(old_risotto_context, 'connection'):
risotto_context.connection = old_risotto_context.connection
return await self.launch(version,
message,
risotto_context,
check_role,
kwargs,
function_objs)
else:
async with self.pool.acquire() as connection:
await connection.set_type_codec(
'json',
encoder=dumps,
decoder=loads,
schema='pg_catalog'
)
risotto_context.connection = connection
async with connection.transaction():
return await self.launch(version,
message,
risotto_context,
check_role,
kwargs,
function_objs)
2019-11-29 16:38:33 +01:00
2019-11-28 14:50:53 +01:00
2019-12-02 10:29:40 +01:00
class Dispatcher(register.RegisterDispatcher, CallDispatcher, PublishDispatcher):
""" Manage message (call or publish)
so launch a function when a message is called
"""
def build_new_context(self,
old_risotto_context: Context,
version: str,
message: str,
type: str):
""" This is a new call or a new publish, so create a new context
"""
uri = version + '.' + message
risotto_context = Context()
risotto_context.username = old_risotto_context.username
risotto_context.paths = copy(old_risotto_context.paths)
risotto_context.paths.append(uri)
risotto_context.uri = uri
risotto_context.type = type
risotto_context.message = message
risotto_context.version = version
return risotto_context
2019-12-28 12:29:11 +01:00
async def check_message_type(self,
risotto_context: Context,
kwargs: Dict):
2019-12-02 10:29:40 +01:00
if self.messages[risotto_context.version][risotto_context.message]['pattern'] != risotto_context.type:
msg = _(f'{risotto_context.uri} is not a {risotto_context.type} message')
2019-12-28 12:29:11 +01:00
await log.error_msg(risotto_context, kwargs, msg)
2019-12-02 10:29:40 +01:00
raise CallError(msg)
2019-11-28 14:50:53 +01:00
2019-12-26 11:38:31 +01:00
async def load_kwargs_to_config(self,
risotto_context: Context,
2019-12-27 15:09:38 +01:00
uri: str,
kwargs: Dict,
check_role: bool):
2019-12-02 10:29:40 +01:00
""" create a new Config et set values to it
"""
# create a new config
2019-12-26 11:38:31 +01:00
config = await Config(self.option)
await config.property.read_write()
2019-12-02 10:29:40 +01:00
# set message's option
2019-12-26 11:38:31 +01:00
await config.option('message').value.set(risotto_context.message)
2019-12-02 10:29:40 +01:00
# store values
subconfig = config.option(risotto_context.message)
for key, value in kwargs.items():
try:
2019-12-26 11:38:31 +01:00
await subconfig.option(key).value.set(value)
2019-12-02 10:29:40 +01:00
except AttributeError:
if get_config()['global']['debug']:
2019-12-02 10:29:40 +01:00
print_exc()
2019-12-27 15:09:38 +01:00
raise ValueError(_(f'unknown parameter in "{uri}": "{key}"'))
2019-12-02 10:29:40 +01:00
# check mandatories options
2019-12-27 15:09:38 +01:00
if check_role and get_config().get('global').get('check_role'):
await self.check_role(subconfig,
risotto_context.username,
uri)
2019-12-26 11:38:31 +01:00
await config.property.read_only()
mandatories = await config.value.mandatory()
2019-12-02 10:29:40 +01:00
if mandatories:
mand = [mand.split('.')[-1] for mand in mandatories]
2019-12-27 15:09:38 +01:00
raise ValueError(_(f'missing parameters in "{uri}": {mand}'))
# return complete an validated kwargs
return await subconfig.value.dict()
2019-12-02 10:29:40 +01:00
2019-12-02 14:22:40 +01:00
def get_service(self,
name: str):
return self.injected_self[name]
2019-12-27 15:09:38 +01:00
async def check_role(self,
config: Config,
user_login: str,
uri: str) -> None:
async with self.pool.acquire() as connection:
2019-12-27 15:09:38 +01:00
async with connection.transaction():
# Verify if user exists and get ID
sql = '''
SELECT UserId
FROM RisottoUser
WHERE UserLogin = $1
'''
user_id = await connection.fetchval(sql,
user_login)
if user_id is None:
raise NotAllowedError(_(f"You ({user_login}) don't have any account in this application"))
# Get all references for this message
refs = {}
for option in await config.list('all'):
ref = await option.information.get('ref')
if ref:
refs[ref] = str(await option.value.get())
# Check role
select_role_uri = '''
SELECT RoleName
FROM URI, RoleURI
WHERE URI.URIName = $1 AND RoleURI.URIId = URI.URIId
'''
select_role_user = '''
SELECT RoleAttribute, RoleAttributeValue
FROM UserRole
WHERE RoleUserId = $1 AND RoleName = $2
'''
for uri_role in await connection.fetch(select_role_uri,
uri):
for user_role in await connection.fetch(select_role_user,
user_id,
uri_role['rolename']):
if not user_role['roleattribute']:
return
if user_role['roleattribute'] in refs and \
user_role['roleattributevalue'] == refs[user_role['roleattribute']]:
return
raise NotAllowedError(_(f'You ({user_login}) don\'t have any authorisation to access to "{uri}"'))
2019-12-28 12:29:11 +01:00
async def launch(self,
version: str,
message: str,
risotto_context: Context,
check_role: bool,
kwargs: Dict,
function_objs: List) -> Optional[Dict]:
await self.check_message_type(risotto_context,
kwargs)
try:
config_arguments = await self.load_kwargs_to_config(risotto_context,
f'{version}.{message}',
kwargs,
check_role)
except Exception as err:
2020-01-14 14:11:41 +01:00
# if there is a problem with arguments, just send an error and do nothing
if get_config()['global']['debug']:
2019-12-28 12:29:11 +01:00
print_exc()
await log.error_msg(risotto_context, kwargs, err)
2020-01-14 14:11:41 +01:00
if risotto_context.type == 'rpc':
raise err
2019-12-28 12:29:11 +01:00
return
# config is ok, so send the message
for function_obj in function_objs:
function = function_obj['function']
module_name = function.__module__.split('.')[-2]
function_name = function.__name__
info_msg = _(f'in module {module_name}.{function_name}')
try:
# build argument for this function
if risotto_context.type == 'rpc':
kw = config_arguments
else:
kw = {}
for key, value in config_arguments.items():
if key in function_obj['arguments']:
kw[key] = value
2020-01-13 19:53:09 +01:00
kw['risotto_context'] = risotto_context
2019-12-28 12:29:11 +01:00
returns = await function(self.injected_self[function_obj['module']], **kw)
except CallError as err:
if risotto_context.type == 'rpc':
raise err
continue
except Exception as err:
if get_config().get('global').get('debug'):
print_exc()
await log.error_msg(risotto_context,
kwargs,
err)
if risotto_context.type == 'rpc':
raise CallError(str(err))
continue
else:
if risotto_context.type == 'rpc':
# valid returns
await self.valid_call_returns(risotto_context,
function,
returns,
kwargs)
# log the success
await log.info_msg(risotto_context,
{'arguments': kwargs,
'returns': returns},
info_msg)
# notification
if function_obj.get('notification'):
notif_version, notif_message = function_obj['notification'].split('.', 1)
if not isinstance(returns, list):
send_returns = [returns]
else:
send_returns = returns
for ret in send_returns:
await self.publish(notif_version,
notif_message,
risotto_context,
**ret)
if risotto_context.type == 'rpc':
return returns
2019-12-02 10:29:40 +01:00
dispatcher = Dispatcher()
register.dispatcher = dispatcher