risotto/src/risotto/services/config/storage.py

139 lines
4.8 KiB
Python

import time
from rougail import modes
class StorageError(Exception):
pass
class Storage(object):
__slots__ = ('sessions',)
def __init__(self):
self.sessions = {}
def has_session(self, id_):
return id_ in self.sessions
def config_exists(self, id_):
return self.sessions[id_]['config_exists']
def add_session(self, session_id, orig_config, server_id, username, storage):
for session in self.sessions.values():
if session['id'] == server_id:
raise Storage(_(f'{username} already edits this configuration'))
prefix_id = "{}_".format(session_id)
config_server, orig_config = self.transform_orig_config(orig_config, server_id)
config_id = "{}{}".format(prefix_id, config_server)
meta = orig_config.config.deepcopy(session_id=config_id, storage=storage, metaconfig_prefix=prefix_id)
config = meta
while True:
try:
children = list(config.config.list())
except:
break
if children:
config = children[0]
else:
break
config.property.read_write()
self.set_owner(config, username)
orig_values = config.value.exportation()
config.information.set('orig_values', orig_values)
config_exists = False
for owner in orig_values[3]:
if isinstance(owner, list):
if set(owner) != {'forced'}:
config_exists = True
break
elif owner != 'forced':
config_exists = True
break
self.sessions[session_id] = {'config': config,
# do not delete meta, so keep it!
'meta': meta,
'orig_config': orig_config,
'id': server_id,
'timestamp': int(time.time()),
'username': username,
'option': None,
'namespace': 'creole',
'config_exists': config_exists}
self.set_config_mode(session_id, 'normal')
self.set_config_debug(session_id, False)
def list_sessions(self):
for session_id, session in self.sessions.items():
yield {'session_id': session_id,
'id': session['id'],
'timestamp': session['timestamp'],
'username': session['username'],
'namespace': session['namespace'],
'mode': session['mode'],
'debug': session['debug']}
def del_session(self, id_):
del self.sessions[id_]
def get_session(self, id_):
if id_ not in self.sessions:
raise Exception('please start a session before')
return self.sessions[id_]
def save_values(self, id_):
config = self.sessions[id_]['config']
server_id = self.sessions[id_]['id']
orig_config = self.sessions[id_]['orig_config']
values = config.value.exportation()
orig_config.value.importation(values)
orig_config.permissive.importation(config.permissive.exportation())
# current values become old values in diff_config
config.information.set('orig_values', values)
def get_username(self, id_):
return self.get_session(id_)['username']
def set_config_mode(self, id_, mode):
""" Define which edition mode to select
"""
config = self.get_session(id_)['config']
for mode_level in modes.values():
if modes[mode] < mode_level:
config.property.add(mode_level.name)
else:
config.property.pop(mode_level.name)
self.sessions[id_]['mode'] = mode
def set_config_debug(self, id_, is_debug):
""" Enable/Disable debug mode
"""
config = self.get_session(id_)['config']
if is_debug:
config.property.pop('hidden')
else:
config.property.add('hidden')
self.sessions[id_]['debug'] = is_debug
class StorageServer(Storage):
def transform_orig_config(self, orig_config, server_id):
config_server = "std_{}".format(server_id)
orig_config = orig_config.config(config_server)
return config_server, orig_config
def set_owner(self, config, username):
config.owner.set(username)
class StorageServermodel(Storage):
def transform_orig_config(self, orig_config, server_id):
config_server = "v_{}".format(server_id)
return config_server, orig_config
def set_owner(self, config, username):
config.owner.set('servermodel_' + username)
storage_server = StorageServer()
storage_servermodel = StorageServermodel()