This commit is contained in:
2019-12-13 13:55:30 +01:00
parent 7dc6ce7845
commit a7934e37d7
22 changed files with 431 additions and 78 deletions

View File

@ -92,20 +92,20 @@ class CallDispatcher:
try:
tiramisu_config = self.load_kwargs_to_config(risotto_context,
kwargs)
obj = self.messages[version][message]
function_obj = self.messages[version][message]
kw = tiramisu_config.option(message).value.dict()
risotto_context.function = obj['function']
if obj['risotto_context']:
risotto_context.function = function_obj['function']
if function_obj['risotto_context']:
kw['risotto_context'] = risotto_context
if 'database' in obj and obj['database']:
if function_obj['database']:
db_conf = get_config().get('database')
pool = await asyncpg.create_pool(database=db_conf.get('dbname'), user=db_conf.get('user'))
async with pool.acquire() as connection:
risotto_context.connection = connection
async with connection.transaction():
returns = await risotto_context.function(self.injected_self[obj['module']], **kw)
returns = await risotto_context.function(self.injected_self[function_obj['module']], **kw)
else:
returns = await risotto_context.function(self.injected_self[obj['module']], **kw)
returns = await risotto_context.function(self.injected_self[function_obj['module']], **kw)
except CallError as err:
raise err
except Exception as err:
@ -124,8 +124,8 @@ class CallDispatcher:
kwargs,
_(f'returns {returns}'))
# notification
if obj.get('notification'):
notif_version, notif_message = obj['notification'].split('.', 1)
if function_obj.get('notification'):
notif_version, notif_message = function_obj['notification'].split('.', 1)
if not isinstance(returns, list):
send_returns = [returns]
else:
@ -174,7 +174,15 @@ class PublishDispatcher:
if function_obj['risotto_context']:
kw['risotto_context'] = risotto_context
# send event
returns = await function(self.injected_self[function_obj['module']], **kw)
if function_obj['database']:
db_conf = get_config().get('database')
pool = await asyncpg.create_pool(database=db_conf.get('dbname'), user=db_conf.get('user'))
async with pool.acquire() as connection:
risotto_context.connection = connection
async with connection.transaction():
returns = await function(self.injected_self[function_obj['module']], **kw)
else:
returns = await function(self.injected_self[function_obj['module']], **kw)
except Exception as err:
if DEBUG:
print_exc()