Compare commits

..

2 Commits

1 changed files with 53 additions and 44 deletions

View File

@ -260,58 +260,67 @@ class PublishDispatcher:
False, False,
False, False,
) )
async with self.pool.acquire() as connection: async with self.pool.acquire() as log_connection:
await connection.set_type_codec( await log_connection.set_type_codec(
'json', 'json',
encoder=dumps, encoder=dumps,
decoder=loads, decoder=loads,
schema='pg_catalog' schema='pg_catalog'
) )
risotto_context.connection = connection async with log_connection.transaction():
for function_obj in self.messages[version][message]['functions']: risotto_context.log_connection = log_connection
function_name = function_obj['function'].__name__ async with self.pool.acquire() as connection:
info_msg = _(f"call function {function_obj['full_module_name']}.{function_name}") await connection.set_type_codec(
try: 'json',
async with connection.transaction(): encoder=dumps,
decoder=loads,
schema='pg_catalog'
)
risotto_context.connection = connection
for function_obj in self.messages[version][message]['functions']:
function_name = function_obj['function'].__name__
info_msg = _(f"call function {function_obj['full_module_name']}.{function_name}")
try: try:
await log.start(risotto_context, async with connection.transaction():
kwargs, try:
info_msg, await log.start(risotto_context,
) kwargs,
await self.check_message_type(risotto_context, info_msg,
kwargs, )
) await self.check_message_type(risotto_context,
await self.launch(risotto_context, kwargs,
kwargs, )
config_arguments, await self.launch(risotto_context,
function_obj, kwargs,
) config_arguments,
# log the success function_obj,
await log.success(risotto_context) )
except CallError as err: # log the success
await log.success(risotto_context)
except CallError as err:
if get_config()['global']['debug']:
print_exc()
await log.failed(risotto_context,
str(err),
)
except CallError:
pass
except Exception as err:
# if there is a problem with arguments, log and do nothing
if get_config()['global']['debug']: if get_config()['global']['debug']:
print_exc() print_exc()
await log.failed(risotto_context, async with self.pool.acquire() as connection:
str(err), await connection.set_type_codec(
) 'json',
except CallError: encoder=dumps,
pass decoder=loads,
except Exception as err: schema='pg_catalog'
# if there is a problem with arguments, log and do nothing )
if get_config()['global']['debug']: risotto_context.connection = connection
print_exc() async with connection.transaction():
async with self.pool.acquire() as connection: await log.failed(risotto_context,
await connection.set_type_codec( str(err),
'json', )
encoder=dumps,
decoder=loads,
schema='pg_catalog'
)
risotto_context.connection = connection
async with connection.transaction():
await log.failed(risotto_context,
str(err),
)
class Dispatcher(register.RegisterDispatcher, class Dispatcher(register.RegisterDispatcher,