445 lines
21 KiB
Python
445 lines
21 KiB
Python
try:
|
|
from tiramisu3 import Config
|
|
from tiramisu3.error import ValueOptionError
|
|
except:
|
|
from tiramisu import Config
|
|
from tiramisu.error import ValueOptionError
|
|
from asyncio import get_event_loop, ensure_future
|
|
from traceback import print_exc
|
|
from copy import copy
|
|
from typing import Dict, Callable, List, Optional
|
|
from json import dumps, loads
|
|
|
|
from .utils import _
|
|
from .error import CallError, NotAllowedError
|
|
from .logger import log
|
|
from .config import get_config
|
|
from .context import Context
|
|
from . import register
|
|
|
|
|
|
class CallDispatcher:
|
|
async def valid_call_returns(self,
|
|
risotto_context: Context,
|
|
function,
|
|
returns: Dict,
|
|
kwargs: Dict):
|
|
response = self.messages[risotto_context.version][risotto_context.message]['response']
|
|
module_name = function.__module__.split('.')[-2]
|
|
function_name = function.__name__
|
|
if response.impl_get_information('multi'):
|
|
if not isinstance(returns, list):
|
|
err = _(f'function {module_name}.{function_name} has to return a list')
|
|
await log.error_msg(risotto_context, kwargs, err)
|
|
raise CallError(str(err))
|
|
else:
|
|
if not isinstance(returns, dict):
|
|
await log.error_msg(risotto_context, kwargs, returns)
|
|
err = _(f'function {module_name}.{function_name} has to return a dict')
|
|
await log.error_msg(risotto_context, kwargs, err)
|
|
raise CallError(str(err))
|
|
returns = [returns]
|
|
if response is None:
|
|
raise Exception('hu?')
|
|
else:
|
|
for ret in returns:
|
|
async with await Config(response, display_name=lambda self, dyn_name, suffix: self.impl_getname()) as config:
|
|
await config.property.read_write()
|
|
try:
|
|
for key, value in ret.items():
|
|
await config.option(key).value.set(value)
|
|
except AttributeError:
|
|
err = _(f'function {module_name}.{function_name} return the unknown parameter "{key}" for the uri "{risotto_context.version}.{risotto_context.message}"')
|
|
await log.error_msg(risotto_context, kwargs, err)
|
|
raise CallError(str(err))
|
|
except ValueError:
|
|
err = _(f'function {module_name}.{function_name} return the parameter "{key}" with an unvalid value "{value}" for the uri "{risotto_context.version}.{risotto_context.message}"')
|
|
await log.error_msg(risotto_context, kwargs, err)
|
|
raise CallError(str(err))
|
|
await config.property.read_only()
|
|
mandatories = await config.value.mandatory()
|
|
if mandatories:
|
|
mand = [mand.split('.')[-1] for mand in mandatories]
|
|
raise ValueError(_(f'missing parameters in response of the uri "{risotto_context.version}.{risotto_context.message}": {mand} in message'))
|
|
try:
|
|
await config.value.dict()
|
|
except Exception as err:
|
|
err = _(f'function {module_name}.{function_name} return an invalid response {err} for the uri "{risotto_context.version}.{risotto_context.message}"')
|
|
await log.error_msg(risotto_context, kwargs, err)
|
|
raise CallError(str(err))
|
|
|
|
async def call(self,
|
|
version: str,
|
|
message: str,
|
|
old_risotto_context: Context,
|
|
check_role: bool=False,
|
|
internal: bool=True,
|
|
**kwargs,
|
|
):
|
|
""" execute the function associate with specified uri
|
|
arguments are validate before
|
|
"""
|
|
risotto_context = self.build_new_context(old_risotto_context.__dict__,
|
|
version,
|
|
message,
|
|
'rpc',
|
|
)
|
|
if version not in self.messages:
|
|
raise CallError(_(f'cannot find version of message "{version}"'))
|
|
if message not in self.messages[version]:
|
|
raise CallError(_(f'cannot find message "{version}.{message}"'))
|
|
function_obj = self.messages[version][message]
|
|
# do not start a new database connection
|
|
if hasattr(old_risotto_context, 'connection'):
|
|
risotto_context.connection = old_risotto_context.connection
|
|
await self.check_message_type(risotto_context,
|
|
kwargs,
|
|
)
|
|
config_arguments = await self.load_kwargs_to_config(risotto_context,
|
|
f'{version}.{message}',
|
|
kwargs,
|
|
check_role,
|
|
internal,
|
|
)
|
|
return await self.launch(risotto_context,
|
|
kwargs,
|
|
config_arguments,
|
|
function_obj,
|
|
)
|
|
else:
|
|
try:
|
|
await self.check_message_type(risotto_context,
|
|
kwargs,
|
|
)
|
|
config_arguments = await self.load_kwargs_to_config(risotto_context,
|
|
f'{version}.{message}',
|
|
kwargs,
|
|
check_role,
|
|
internal,
|
|
)
|
|
async with self.pool.acquire() as connection:
|
|
await connection.set_type_codec(
|
|
'json',
|
|
encoder=dumps,
|
|
decoder=loads,
|
|
schema='pg_catalog'
|
|
)
|
|
risotto_context.connection = connection
|
|
async with connection.transaction():
|
|
return await self.launch(risotto_context,
|
|
kwargs,
|
|
config_arguments,
|
|
function_obj,
|
|
)
|
|
except CallError as err:
|
|
raise err
|
|
except Exception as err:
|
|
# if there is a problem with arguments, just send an error and do nothing
|
|
if get_config()['global']['debug']:
|
|
print_exc()
|
|
async with self.pool.acquire() as connection:
|
|
await connection.set_type_codec(
|
|
'json',
|
|
encoder=dumps,
|
|
decoder=loads,
|
|
schema='pg_catalog'
|
|
)
|
|
risotto_context.connection = connection
|
|
async with connection.transaction():
|
|
await log.error_msg(risotto_context, kwargs, err)
|
|
raise err
|
|
|
|
|
|
class PublishDispatcher:
|
|
async def register_remote(self) -> None:
|
|
print()
|
|
print(_('======== Registered remote event ========'))
|
|
self.listened_connection = await self.pool.acquire()
|
|
for version, messages in self.messages.items():
|
|
for message, message_infos in messages.items():
|
|
# event not emit locally
|
|
if message_infos['pattern'] == 'event' and 'functions' in message_infos and message_infos['functions']:
|
|
# module, submodule, submessage = message.split('.', 2)
|
|
# if f'{module}.{submodule}' not in self.injected_self:
|
|
uri = f'{version}.{message}'
|
|
print(f' - {uri}')
|
|
await self.listened_connection.add_listener(uri,
|
|
self.to_async_publish,
|
|
)
|
|
|
|
async def publish(self,
|
|
version: str,
|
|
message: str,
|
|
risotto_context: Context,
|
|
**kwargs,
|
|
) -> None:
|
|
if version not in self.messages or message not in self.messages[version]:
|
|
raise ValueError(_(f'cannot find URI "{version}.{message}"'))
|
|
|
|
# publish to remote
|
|
remote_kw = dumps({'kwargs': kwargs,
|
|
'context': {'username': risotto_context.username,
|
|
'paths': risotto_context.paths,
|
|
}
|
|
})
|
|
# FIXME should be better :/
|
|
remote_kw = remote_kw.replace("'", "''")
|
|
await risotto_context.connection.execute(f'NOTIFY "{version}.{message}", \'{remote_kw}\'')
|
|
|
|
def to_async_publish(self,
|
|
con: 'asyncpg.connection.Connection',
|
|
pid: int,
|
|
uri: str,
|
|
payload: str,
|
|
) -> None:
|
|
version, message = uri.split('.', 1)
|
|
loop = get_event_loop()
|
|
remote_kw = loads(payload)
|
|
risotto_context = self.build_new_context(remote_kw['context'],
|
|
version,
|
|
message,
|
|
'event',
|
|
)
|
|
callback = lambda: ensure_future(self._publish(version,
|
|
message,
|
|
risotto_context,
|
|
**remote_kw['kwargs'],
|
|
))
|
|
loop.call_soon(callback)
|
|
|
|
async def _publish(self,
|
|
version: str,
|
|
message: str,
|
|
risotto_context: Context,
|
|
**kwargs,
|
|
) -> None:
|
|
config_arguments = await self.load_kwargs_to_config(risotto_context,
|
|
f'{version}.{message}',
|
|
kwargs,
|
|
False,
|
|
False,
|
|
)
|
|
for function_obj in self.messages[version][message]['functions']:
|
|
print('======', function_obj['function'].__name__)
|
|
async with self.pool.acquire() as connection:
|
|
try:
|
|
await self.check_message_type(risotto_context,
|
|
kwargs,
|
|
)
|
|
await connection.set_type_codec(
|
|
'json',
|
|
encoder=dumps,
|
|
decoder=loads,
|
|
schema='pg_catalog'
|
|
)
|
|
risotto_context.connection = connection
|
|
async with connection.transaction():
|
|
await self.launch(risotto_context,
|
|
kwargs,
|
|
config_arguments,
|
|
function_obj,
|
|
)
|
|
except CallError as err:
|
|
pass
|
|
except Exception as err:
|
|
# if there is a problem with arguments, log and do nothing
|
|
if get_config()['global']['debug']:
|
|
print_exc()
|
|
async with self.pool.acquire() as connection:
|
|
await connection.set_type_codec(
|
|
'json',
|
|
encoder=dumps,
|
|
decoder=loads,
|
|
schema='pg_catalog'
|
|
)
|
|
risotto_context.connection = connection
|
|
async with connection.transaction():
|
|
await log.error_msg(risotto_context, kwargs, err)
|
|
|
|
|
|
class Dispatcher(register.RegisterDispatcher,
|
|
CallDispatcher,
|
|
PublishDispatcher):
|
|
""" Manage message (call or publish)
|
|
so launch a function when a message is called
|
|
"""
|
|
def build_new_context(self,
|
|
context: dict,
|
|
version: str,
|
|
message: str,
|
|
type: str,
|
|
) -> Context:
|
|
""" This is a new call or a new publish, so create a new context
|
|
"""
|
|
uri = version + '.' + message
|
|
risotto_context = Context()
|
|
risotto_context.username = context['username']
|
|
risotto_context.paths = copy(context['paths'])
|
|
risotto_context.paths.append(uri)
|
|
risotto_context.uri = uri
|
|
risotto_context.type = type
|
|
risotto_context.message = message
|
|
risotto_context.version = version
|
|
return risotto_context
|
|
|
|
async def check_message_type(self,
|
|
risotto_context: Context,
|
|
kwargs: Dict,
|
|
) -> None:
|
|
if self.messages[risotto_context.version][risotto_context.message]['pattern'] != risotto_context.type:
|
|
msg = _(f'{risotto_context.uri} is not a {risotto_context.type} message')
|
|
await log.error_msg(risotto_context, kwargs, msg)
|
|
raise CallError(msg)
|
|
|
|
async def load_kwargs_to_config(self,
|
|
risotto_context: Context,
|
|
uri: str,
|
|
kwargs: Dict,
|
|
check_role: bool,
|
|
internal: bool,
|
|
):
|
|
""" create a new Config et set values to it
|
|
"""
|
|
# create a new config
|
|
async with await Config(self.option) as config:
|
|
await config.property.read_write()
|
|
# set message's option
|
|
await config.option('message').value.set(uri)
|
|
# store values
|
|
subconfig = config.option(uri)
|
|
extra_parameters = {}
|
|
for key, value in kwargs.items():
|
|
if not internal or not key.startswith('_'):
|
|
try:
|
|
await subconfig.option(key).value.set(value)
|
|
except AttributeError:
|
|
if get_config()['global']['debug']:
|
|
print_exc()
|
|
raise ValueError(_(f'unknown parameter in "{uri}": "{key}"'))
|
|
except ValueOptionError as err:
|
|
raise ValueError(_(f'invalid parameter in "{uri}": {err}'))
|
|
else:
|
|
extra_parameters[key] = value
|
|
# check mandatories options
|
|
if check_role and get_config().get('global').get('check_role'):
|
|
await self.check_role(subconfig,
|
|
risotto_context.username,
|
|
uri)
|
|
await config.property.read_only()
|
|
mandatories = await config.value.mandatory()
|
|
if mandatories:
|
|
mand = [mand.split('.')[-1] for mand in mandatories]
|
|
raise ValueError(_(f'missing parameters in "{uri}": {mand}'))
|
|
# return complete an validated kwargs
|
|
parameters = await subconfig.value.dict()
|
|
if extra_parameters:
|
|
parameters.update(extra_parameters)
|
|
return parameters
|
|
|
|
def get_service(self,
|
|
name: str):
|
|
return self.injected_self[name]
|
|
|
|
async def check_role(self,
|
|
config: Config,
|
|
user_login: str,
|
|
uri: str) -> None:
|
|
async with self.pool.acquire() as connection:
|
|
async with connection.transaction():
|
|
# Verify if user exists and get ID
|
|
sql = '''
|
|
SELECT UserId
|
|
FROM UserUser
|
|
WHERE UserLogin = $1
|
|
'''
|
|
user_id = await connection.fetchval(sql,
|
|
user_login)
|
|
if user_id is None:
|
|
raise NotAllowedError(_(f"You ({user_login}) don't have any account in this application"))
|
|
|
|
# Get all references for this message
|
|
refs = {}
|
|
for option in await config.list('all'):
|
|
ref = await option.information.get('ref')
|
|
if ref:
|
|
refs[ref] = str(await option.value.get())
|
|
|
|
# Check role
|
|
select_role_uri = '''
|
|
SELECT RoleName
|
|
FROM UserURI, UserRoleURI
|
|
WHERE UserURI.URIName = $1 AND UserRoleURI.URIId = UserURI.URIId
|
|
'''
|
|
select_role_user = '''
|
|
SELECT RoleAttribute, RoleAttributeValue
|
|
FROM UserRole
|
|
WHERE RoleUserId = $1 AND RoleName = $2
|
|
'''
|
|
for uri_role in await connection.fetch(select_role_uri,
|
|
uri):
|
|
for user_role in await connection.fetch(select_role_user,
|
|
user_id,
|
|
uri_role['rolename']):
|
|
if not user_role['roleattribute']:
|
|
return
|
|
if user_role['roleattribute'] in refs and \
|
|
user_role['roleattributevalue'] == refs[user_role['roleattribute']]:
|
|
return
|
|
raise NotAllowedError(_(f'You ({user_login}) don\'t have any authorisation to access to "{uri}"'))
|
|
|
|
async def launch(self,
|
|
risotto_context: Context,
|
|
kwargs: Dict,
|
|
config_arguments: dict,
|
|
function_obj: Callable,
|
|
) -> Optional[Dict]:
|
|
# so send the message
|
|
function = function_obj['function']
|
|
submodule_name = function_obj['module']
|
|
function_name = function.__name__
|
|
risotto_context.module = submodule_name.split('.', 1)[0]
|
|
info_msg = _(f'in module {submodule_name}.{function_name}')
|
|
# build argument for this function
|
|
if risotto_context.type == 'rpc':
|
|
kw = config_arguments
|
|
else:
|
|
kw = {}
|
|
for key, value in config_arguments.items():
|
|
if key in function_obj['arguments']:
|
|
kw[key] = value
|
|
|
|
kw['risotto_context'] = risotto_context
|
|
returns = await function(self.injected_self[function_obj['module']], **kw)
|
|
if risotto_context.type == 'rpc':
|
|
# valid returns
|
|
await self.valid_call_returns(risotto_context,
|
|
function,
|
|
returns,
|
|
kwargs,
|
|
)
|
|
# log the success
|
|
await log.info_msg(risotto_context,
|
|
{'arguments': kwargs,
|
|
'returns': returns},
|
|
info_msg,
|
|
)
|
|
# notification
|
|
if function_obj.get('notification'):
|
|
notif_version, notif_message = function_obj['notification'].split('.', 1)
|
|
if not isinstance(returns, list):
|
|
send_returns = [returns]
|
|
else:
|
|
send_returns = returns
|
|
for ret in send_returns:
|
|
await self.publish(notif_version,
|
|
notif_message,
|
|
risotto_context,
|
|
**ret,
|
|
)
|
|
if risotto_context.type == 'rpc':
|
|
return returns
|
|
|
|
|
|
dispatcher = Dispatcher()
|
|
register.dispatcher = dispatcher
|