Compare commits

..

No commits in common. "2a985757900209181fd5a4de3934c7986c535ce3" and "52209a5ebd05df89913c23680c5beb98c12d5804" have entirely different histories.

4 changed files with 178 additions and 173 deletions

View File

@ -1,12 +1,12 @@
CREATE TABLE RisottoLog( CREATE TABLE RisottoLog(
LogId SERIAL PRIMARY KEY, LogId SERIAL PRIMARY KEY,
ContextId INTEGER,
Msg VARCHAR(255) NOT NULL, Msg VARCHAR(255) NOT NULL,
URI VARCHAR(255), URI VARCHAR(255),
URIS VARCHAR(255), URIS VARCHAR(255),
UserLogin VARCHAR(100) NOT NULL, UserLogin VARCHAR(100) NOT NULL,
Level VARCHAR(10) NOT NULL, Level VARCHAR(10) NOT NULL,
Kwargs JSON, ContextId INTEGER,
Data JSON,
Returns JSON, Returns JSON,
StartDate timestamp DEFAULT current_timestamp, StartDate timestamp DEFAULT current_timestamp,
StopDate timestamp StopDate timestamp

View File

@ -126,59 +126,60 @@ class CallDispatcher:
decoder=loads, decoder=loads,
schema='pg_catalog' schema='pg_catalog'
) )
risotto_context.log_connection = log_connection async with log_connection.transaction():
try: try:
async with self.pool.acquire() as connection: risotto_context.log_connection = log_connection
await connection.set_type_codec( async with self.pool.acquire() as connection:
'json', await connection.set_type_codec(
encoder=dumps, 'json',
decoder=loads, encoder=dumps,
schema='pg_catalog' decoder=loads,
) schema='pg_catalog'
risotto_context.connection = connection )
async with connection.transaction(): risotto_context.connection = connection
try: async with connection.transaction():
await log.start(risotto_context, try:
kwargs, await log.start(risotto_context,
info_msg, kwargs,
) info_msg,
await self.check_message_type(risotto_context, )
kwargs, await self.check_message_type(risotto_context,
) kwargs,
config_arguments = await self.load_kwargs_to_config(risotto_context, )
f'{version}.{message}', config_arguments = await self.load_kwargs_to_config(risotto_context,
kwargs, f'{version}.{message}',
check_role, kwargs,
internal, check_role,
) internal,
ret = await self.launch(risotto_context, )
kwargs, ret = await self.launch(risotto_context,
config_arguments, kwargs,
function_obj, config_arguments,
) function_obj,
# log the success )
await log.success(risotto_context, # log the success
ret, await log.success(risotto_context,
) ret,
if not internal and isinstance(ret, dict): )
ret['context_id'] = risotto_context.context_id if not internal and isinstance(ret, dict):
except CallError as err: ret['context_id'] = risotto_context.context_id
if get_config()['global']['debug']: except CallError as err:
print_exc() if get_config()['global']['debug']:
await log.failed(risotto_context, print_exc()
str(err), await log.failed(risotto_context,
) str(err),
raise err from err )
except CallError as err: raise err from err
error = err except CallError as err:
except Exception as err: error = err
# if there is a problem with arguments, just send an error and do nothing except Exception as err:
if get_config()['global']['debug']: # if there is a problem with arguments, just send an error and do nothing
print_exc() if get_config()['global']['debug']:
await log.failed(risotto_context, print_exc()
str(err), await log.failed(risotto_context,
) str(err),
error = err )
error = err
if error: if error:
if not internal: if not internal:
err = CallError(str(error)) err = CallError(str(error))
@ -259,14 +260,14 @@ class PublishDispatcher:
False, False,
False, False,
) )
for function_obj in self.messages[version][message]['functions']: async with self.pool.acquire() as log_connection:
async with self.pool.acquire() as log_connection: await log_connection.set_type_codec(
await log_connection.set_type_codec( 'json',
'json', encoder=dumps,
encoder=dumps, decoder=loads,
decoder=loads, schema='pg_catalog'
schema='pg_catalog' )
) async with log_connection.transaction():
risotto_context.log_connection = log_connection risotto_context.log_connection = log_connection
async with self.pool.acquire() as connection: async with self.pool.acquire() as connection:
await connection.set_type_codec( await connection.set_type_codec(
@ -276,49 +277,50 @@ class PublishDispatcher:
schema='pg_catalog' schema='pg_catalog'
) )
risotto_context.connection = connection risotto_context.connection = connection
function_name = function_obj['function'].__name__ for function_obj in self.messages[version][message]['functions']:
info_msg = _(f"call function {function_obj['full_module_name']}.{function_name}") function_name = function_obj['function'].__name__
try: info_msg = _(f"call function {function_obj['full_module_name']}.{function_name}")
async with connection.transaction(): try:
try:
await log.start(risotto_context,
kwargs,
info_msg,
)
await self.check_message_type(risotto_context,
kwargs,
)
await self.launch(risotto_context,
kwargs,
config_arguments,
function_obj,
)
# log the success
await log.success(risotto_context)
except CallError as err:
if get_config()['global']['debug']:
print_exc()
await log.failed(risotto_context,
str(err),
)
except CallError:
pass
except Exception as err:
# if there is a problem with arguments, log and do nothing
if get_config()['global']['debug']:
print_exc()
async with self.pool.acquire() as connection:
await connection.set_type_codec(
'json',
encoder=dumps,
decoder=loads,
schema='pg_catalog'
)
risotto_context.connection = connection
async with connection.transaction(): async with connection.transaction():
await log.failed(risotto_context, try:
str(err), await log.start(risotto_context,
) kwargs,
info_msg,
)
await self.check_message_type(risotto_context,
kwargs,
)
await self.launch(risotto_context,
kwargs,
config_arguments,
function_obj,
)
# log the success
await log.success(risotto_context)
except CallError as err:
if get_config()['global']['debug']:
print_exc()
await log.failed(risotto_context,
str(err),
)
except CallError:
pass
except Exception as err:
# if there is a problem with arguments, log and do nothing
if get_config()['global']['debug']:
print_exc()
async with self.pool.acquire() as connection:
await connection.set_type_codec(
'json',
encoder=dumps,
decoder=loads,
schema='pg_catalog'
)
risotto_context.connection = connection
async with connection.transaction():
await log.failed(risotto_context,
str(err),
)
class Dispatcher(register.RegisterDispatcher, class Dispatcher(register.RegisterDispatcher,

View File

@ -13,24 +13,24 @@ class Logger:
""" """
async def insert(self, async def insert(self,
msg: str, msg: str,
uri: str,
uris: str,
risotto_context: Context, risotto_context: Context,
level: str, level: str,
kwargs: Any=None, data: Any=None,
start: bool=False, start: bool=False,
) -> None: ) -> None:
uri = self._get_last_uri(risotto_context)
uris = " ".join(risotto_context.paths)
insert = 'INSERT INTO RisottoLog(Msg, URI, URIS, UserLogin, Level' insert = 'INSERT INTO RisottoLog(Msg, URI, URIS, UserLogin, Level'
values = 'VALUES($1,$2,$3,$4,$5' values = 'VALUES($1,$2,$3,$4,$5'
args = [msg, uri, uris, risotto_context.username, level] args = [msg, uri, uris, risotto_context.username, level]
if kwargs: if data:
insert += ', Kwargs' insert += ', Data'
values += ',$6' values += ',$6'
args.append(dumps(kwargs)) args.append(dumps(data))
context_id = risotto_context.context_id context_id = risotto_context.context_id
if context_id is not None: if context_id is not None:
insert += ', ContextId' insert += ', ContextId'
if kwargs: if data:
values += ',$7' values += ',$7'
else: else:
values += ',$6' values += ',$6'
@ -38,8 +38,7 @@ class Logger:
sql = insert + ') ' + values + ') RETURNING LogId' sql = insert + ') ' + values + ') RETURNING LogId'
try: try:
async with risotto_context.log_connection.transaction(): log_id = await risotto_context.log_connection.fetchval(sql, *args)
log_id = await risotto_context.log_connection.fetchval(sql, *args)
if context_id is None and start: if context_id is None and start:
risotto_context.context_id = log_id risotto_context.context_id = log_id
if start: if start:
@ -52,7 +51,7 @@ class Logger:
context_id: int, context_id: int,
uri: Optional[str], uri: Optional[str],
) -> list: ) -> list:
sql = '''SELECT Msg as msg, URI as uri_name, URIS as uris, UserLogin as user_login, Level as level, Kwargs as kwargs, Returns as returns, StartDate as start_date, StopDate as stop_date sql = '''SELECT Msg as msg, URI as uri_name, URIS as uris, UserLogin as user_login, Level as level, Data as data, StartDate as start_date, StopDate as stop_date
FROM RisottoLog FROM RisottoLog
WHERE UserLogin = $1 AND (LogId = $2 OR ContextId = $2) WHERE UserLogin = $1 AND (LogId = $2 OR ContextId = $2)
''' '''
@ -61,23 +60,20 @@ class Logger:
sql += ' AND URI = $3' sql += ' AND URI = $3'
args.append(uri) args.append(uri)
ret = [] ret = []
async with risotto_context.log_connection.transaction(): for row in await risotto_context.log_connection.fetch(*args):
for row in await risotto_context.log_connection.fetch(*args): d = {}
d = {} for key, value in row.items():
for key, value in row.items(): if key == 'data':
if key in ['kwargs', 'returns']: if isinstance(value, dict):
if isinstance(value, dict): pass
pass elif not value:
elif not value: value = {}
value = {} else:
else: value = loads(value)
value = loads(value) elif key in ['start_date', 'stop_date']:
if key == 'uris': value = str(value)
value = value.split(' ') d[key] = value
elif key in ['start_date', 'stop_date']: ret.append(d)
value = str(value)
d[key] = value
ret.append(d)
return ret return ret
def _get_last_uri(self, def _get_last_uri(self,
@ -115,6 +111,8 @@ class Logger:
paths_msg = self._get_message_paths(risotto_context) paths_msg = self._get_message_paths(risotto_context)
print(_(f'{risotto_context.username}: ERROR: {error} ({paths_msg} with arguments "{arguments}": {msg})')) print(_(f'{risotto_context.username}: ERROR: {error} ({paths_msg} with arguments "{arguments}": {msg})'))
await self.insert(msg, await self.insert(msg,
self._get_last_uri(risotto_context),
paths_msg,
risotto_context, risotto_context,
'Error', 'Error',
arguments, arguments,
@ -131,6 +129,8 @@ class Logger:
if get_config()['global']['debug']: if get_config()['global']['debug']:
print(_(f'{risotto_context.username}: INFO:{paths_msg}: {msg}')) print(_(f'{risotto_context.username}: INFO:{paths_msg}: {msg}'))
await self.insert(msg, await self.insert(msg,
self._get_last_uri(risotto_context),
paths_msg,
risotto_context, risotto_context,
'Info', 'Info',
arguments, arguments,
@ -149,6 +149,8 @@ class Logger:
context = '' context = ''
print(_(f'{risotto_context.username}: START{context}:{paths_msg}: {msg}')) print(_(f'{risotto_context.username}: START{context}:{paths_msg}: {msg}'))
await self.insert(msg, await self.insert(msg,
self._get_last_uri(risotto_context),
paths_msg,
risotto_context, risotto_context,
'Start', 'Start',
arguments, arguments,
@ -173,11 +175,10 @@ class Logger:
args.append(dumps(returns)) args.append(dumps(returns))
sql += """WHERE LogId = $1 sql += """WHERE LogId = $1
""" """
async with risotto_context.log_connection.transaction(): await risotto_context.log_connection.execute(sql,
await risotto_context.log_connection.execute(sql, risotto_context.start_id,
risotto_context.start_id, *args,
*args, )
)
async def failed(self, async def failed(self,
risotto_context: Context, risotto_context: Context,
@ -189,19 +190,18 @@ class Logger:
context = f'({risotto_context.context_id})' context = f'({risotto_context.context_id})'
else: else:
context = '' context = ''
print(_(f'{risotto_context.username}: FAILED({risotto_context.context_id}):{paths_msg}: {err}')) print(_(f'{risotto_context.username}: FAILED({risotto_context.context_id}):{paths_msg}: err'))
sql = """UPDATE RisottoLog sql = """UPDATE RisottoLog
SET StopDate = $2, SET StopDate = $2,
Level = 'FAILED', Level = 'FAILED',
Msg = $3 Msg = $3
WHERE LogId = $1 WHERE LogId = $1
""" """
async with risotto_context.log_connection.transaction(): await risotto_context.log_connection.execute(sql,
await risotto_context.log_connection.execute(sql, risotto_context.start_id,
risotto_context.start_id, datetime.now(),
datetime.now(), err,
err, )
)
async def info(self, async def info(self,
risotto_context, risotto_context,
@ -210,6 +210,8 @@ class Logger:
if get_config()['global']['debug']: if get_config()['global']['debug']:
print(msg) print(msg)
await self.insert(msg, await self.insert(msg,
'',
None,
risotto_context, risotto_context,
'Info', 'Info',
) )

View File

@ -298,36 +298,37 @@ class RegisterDispatcher:
) -> None: ) -> None:
internal_user = get_config()['global']['internal_user'] internal_user = get_config()['global']['internal_user']
async with self.pool.acquire() as log_connection: async with self.pool.acquire() as log_connection:
async with self.pool.acquire() as connection: async with log_connection.transaction():
await connection.set_type_codec( async with self.pool.acquire() as connection:
'json', await connection.set_type_codec(
encoder=dumps, 'json',
decoder=loads, encoder=dumps,
schema='pg_catalog' decoder=loads,
) schema='pg_catalog'
if truncate: )
if truncate:
async with connection.transaction():
await connection.execute('TRUNCATE InfraServer, InfraSite, InfraZone, Log, ProviderDeployment, ProviderFactoryCluster, ProviderFactoryClusterNode, SettingApplicationservice, SettingApplicationServiceDependency, SettingRelease, SettingServer, SettingServermodel, SettingSource, UserRole, UserRoleURI, UserURI, UserUser, InfraServermodel, ProviderZone, ProviderServer, ProviderSource, ProviderApplicationservice, ProviderServermodel')
async with connection.transaction(): async with connection.transaction():
await connection.execute('TRUNCATE InfraServer, InfraSite, InfraZone, Log, ProviderDeployment, ProviderFactoryCluster, ProviderFactoryClusterNode, SettingApplicationservice, SettingApplicationServiceDependency, SettingRelease, SettingServer, SettingServermodel, SettingSource, UserRole, UserRoleURI, UserURI, UserUser, InfraServermodel, ProviderZone, ProviderServer, ProviderSource, ProviderApplicationservice, ProviderServermodel') for submodule_name, module in self.injected_self.items():
async with connection.transaction(): risotto_context = Context()
for submodule_name, module in self.injected_self.items(): risotto_context.username = internal_user
risotto_context = Context() risotto_context.paths.append(f'internal.{submodule_name}.on_join')
risotto_context.username = internal_user risotto_context.type = None
risotto_context.paths.append(f'internal.{submodule_name}.on_join') risotto_context.log_connection = log_connection
risotto_context.type = None risotto_context.connection = connection
risotto_context.log_connection = log_connection risotto_context.module = submodule_name.split('.', 1)[0]
risotto_context.connection = connection info_msg = _(f'in function risotto_{submodule_name}.on_join')
risotto_context.module = submodule_name.split('.', 1)[0] await log.info_msg(risotto_context,
info_msg = _(f'in function risotto_{submodule_name}.on_join') None,
await log.info_msg(risotto_context, info_msg)
None, try:
info_msg) await module.on_join(risotto_context)
try: except Exception as err:
await module.on_join(risotto_context) if get_config()['global']['debug']:
except Exception as err: print_exc()
if get_config()['global']['debug']: msg = _(f'on_join returns an error in module {submodule_name}: {err}')
print_exc() await log.error_msg(risotto_context, {}, msg)
msg = _(f'on_join returns an error in module {submodule_name}: {err}')
await log.error_msg(risotto_context, {}, msg)
async def load(self): async def load(self):
# valid function's arguments # valid function's arguments