Merge branch 'develop' into dist/risotto/risotto-2.8.0/develop

This commit is contained in:
Emmanuel Garette 2021-04-25 20:32:16 +02:00
commit c63170be1d
4 changed files with 191 additions and 176 deletions

View File

@ -3,3 +3,11 @@ class Context:
self.paths = [] self.paths = []
self.context_id = None self.context_id = None
self.start_id = None self.start_id = None
def copy(self):
context = Context()
for key, value in self.__dict__.items():
if key.startswith('__'):
continue
setattr(context, key, value)
return context

View File

@ -89,7 +89,6 @@ class CallDispatcher:
if hasattr(old_risotto_context, 'connection'): if hasattr(old_risotto_context, 'connection'):
# do not start a new database connection # do not start a new database connection
risotto_context.connection = old_risotto_context.connection risotto_context.connection = old_risotto_context.connection
risotto_context.log_connection = old_risotto_context.log_connection
await log.start(risotto_context, await log.start(risotto_context,
kwargs, kwargs,
info_msg, info_msg,
@ -119,14 +118,6 @@ class CallDispatcher:
raise CallError(err) from err raise CallError(err) from err
else: else:
error = None error = None
async with self.pool.acquire() as log_connection:
await log_connection.set_type_codec(
'json',
encoder=dumps,
decoder=loads,
schema='pg_catalog'
)
risotto_context.log_connection = log_connection
try: try:
async with self.pool.acquire() as connection: async with self.pool.acquire() as connection:
await connection.set_type_codec( await connection.set_type_codec(
@ -198,8 +189,6 @@ class PublishDispatcher:
for message, message_infos in messages.items(): for message, message_infos in messages.items():
# event not emit locally # event not emit locally
if message_infos['pattern'] == 'event' and 'functions' in message_infos and message_infos['functions']: if message_infos['pattern'] == 'event' and 'functions' in message_infos and message_infos['functions']:
# module, submodule, submessage = message.split('.', 2)
# if f'{module}.{submodule}' not in self.injected_self:
uri = f'{version}.{message}' uri = f'{version}.{message}'
print(f' - {uri}') print(f' - {uri}')
await self.listened_connection.add_listener(uri, await self.listened_connection.add_listener(uri,
@ -235,21 +224,34 @@ class PublishDispatcher:
version, message = uri.split('.', 1) version, message = uri.split('.', 1)
loop = get_event_loop() loop = get_event_loop()
remote_kw = loads(payload) remote_kw = loads(payload)
for function_obj in self.messages[version][message]['functions']:
risotto_context = self.build_new_context(remote_kw['context'], risotto_context = self.build_new_context(remote_kw['context'],
version, version,
message, message,
'event', 'event',
) )
callback = lambda: ensure_future(self._publish(version, callback = self.get_callback(version, message, function_obj, risotto_context, remote_kw['kwargs'],)
message,
risotto_context,
**remote_kw['kwargs'],
))
loop.call_soon(callback) loop.call_soon(callback)
def get_callback(self,
version,
message,
function_obj,
risotto_context,
kwargs,
):
return lambda: ensure_future(self._publish(version,
message,
function_obj,
risotto_context,
**kwargs,
))
async def _publish(self, async def _publish(self,
version: str, version: str,
message: str, message: str,
function_obj,
risotto_context: Context, risotto_context: Context,
**kwargs, **kwargs,
) -> None: ) -> None:
@ -259,15 +261,6 @@ class PublishDispatcher:
False, False,
False, False,
) )
for function_obj in self.messages[version][message]['functions']:
async with self.pool.acquire() as log_connection:
await log_connection.set_type_codec(
'json',
encoder=dumps,
decoder=loads,
schema='pg_catalog'
)
risotto_context.log_connection = log_connection
async with self.pool.acquire() as connection: async with self.pool.acquire() as connection:
await connection.set_type_codec( await connection.set_type_codec(
'json', 'json',
@ -307,15 +300,6 @@ class PublishDispatcher:
# if there is a problem with arguments, log and do nothing # if there is a problem with arguments, log and do nothing
if get_config()['global']['debug']: if get_config()['global']['debug']:
print_exc() print_exc()
async with self.pool.acquire() as connection:
await connection.set_type_codec(
'json',
encoder=dumps,
decoder=loads,
schema='pg_catalog'
)
risotto_context.connection = connection
async with connection.transaction():
await log.failed(risotto_context, await log.failed(risotto_context,
str(err), str(err),
) )
@ -346,6 +330,7 @@ class Dispatcher(register.RegisterDispatcher,
risotto_context.type = type risotto_context.type = type
risotto_context.message = message risotto_context.message = message
risotto_context.version = version risotto_context.version = version
risotto_context.pool = self.pool
return risotto_context return risotto_context
async def check_message_type(self, async def check_message_type(self,

View File

@ -2,15 +2,34 @@ from typing import Dict, Any, Optional
from json import dumps, loads from json import dumps, loads
from asyncpg.exceptions import UndefinedTableError from asyncpg.exceptions import UndefinedTableError
from datetime import datetime from datetime import datetime
from asyncio import Lock
from .context import Context from .context import Context
from .utils import _ from .utils import _
from .config import get_config from .config import get_config
database_lock = Lock()
class Logger: class Logger:
""" An object to manager log """ An object to manager log
""" """
def __init__(self) -> None:
self.log_connection = None
async def get_connection(self,
risotto_context: Context,
):
if not self.log_connection:
self.log_connection = await risotto_context.pool.acquire()
await self.log_connection.set_type_codec(
'json',
encoder=dumps,
decoder=loads,
schema='pg_catalog'
)
return self.log_connection
async def insert(self, async def insert(self,
msg: str, msg: str,
risotto_context: Context, risotto_context: Context,
@ -38,8 +57,9 @@ class Logger:
sql = insert + ') ' + values + ') RETURNING LogId' sql = insert + ') ' + values + ') RETURNING LogId'
try: try:
async with risotto_context.log_connection.transaction(): async with database_lock:
log_id = await risotto_context.log_connection.fetchval(sql, *args) connection = await self.get_connection(risotto_context)
log_id = await connection.fetchval(sql, *args)
if context_id is None and start: if context_id is None and start:
risotto_context.context_id = log_id risotto_context.context_id = log_id
if start: if start:
@ -61,8 +81,9 @@ class Logger:
sql += ' AND URI = $3' sql += ' AND URI = $3'
args.append(uri) args.append(uri)
ret = [] ret = []
async with risotto_context.log_connection.transaction(): async with database_lock:
for row in await risotto_context.log_connection.fetch(*args): connection = await self.get_connection(risotto_context)
for row in await connection.fetch(*args):
d = {} d = {}
for key, value in row.items(): for key, value in row.items():
if key in ['kwargs', 'returns']: if key in ['kwargs', 'returns']:
@ -173,8 +194,9 @@ class Logger:
args.append(dumps(returns)) args.append(dumps(returns))
sql += """WHERE LogId = $1 sql += """WHERE LogId = $1
""" """
async with risotto_context.log_connection.transaction(): async with database_lock:
await risotto_context.log_connection.execute(sql, connection = await self.get_connection(risotto_context)
await connection.execute(sql,
risotto_context.start_id, risotto_context.start_id,
*args, *args,
) )
@ -196,8 +218,9 @@ class Logger:
Msg = $3 Msg = $3
WHERE LogId = $1 WHERE LogId = $1
""" """
async with risotto_context.log_connection.transaction(): async with database_lock:
await risotto_context.log_connection.execute(sql, connection = await self.get_connection(risotto_context)
await connection.execute(sql,
risotto_context.start_id, risotto_context.start_id,
datetime.now(), datetime.now(),
err, err,

View File

@ -297,7 +297,6 @@ class RegisterDispatcher:
truncate: bool=False, truncate: bool=False,
) -> None: ) -> None:
internal_user = get_config()['global']['internal_user'] internal_user = get_config()['global']['internal_user']
async with self.pool.acquire() as log_connection:
async with self.pool.acquire() as connection: async with self.pool.acquire() as connection:
await connection.set_type_codec( await connection.set_type_codec(
'json', 'json',
@ -314,7 +313,7 @@ class RegisterDispatcher:
risotto_context.username = internal_user risotto_context.username = internal_user
risotto_context.paths.append(f'internal.{submodule_name}.on_join') risotto_context.paths.append(f'internal.{submodule_name}.on_join')
risotto_context.type = None risotto_context.type = None
risotto_context.log_connection = log_connection risotto_context.pool = self.pool
risotto_context.connection = connection risotto_context.connection = connection
risotto_context.module = submodule_name.split('.', 1)[0] risotto_context.module = submodule_name.split('.', 1)[0]
info_msg = _(f'in function risotto_{submodule_name}.on_join') info_msg = _(f'in function risotto_{submodule_name}.on_join')