import time from typing import Dict from tiramisu import Config from rougail import modes from ...error import CallError, NotAllowedError class StorageError(Exception): pass class Storage(object): __slots__ = ('sessions',) def __init__(self): self.sessions = {} def add_session(self, session_id: int, orig_config: Config, server_id: int, username: str, config_storage): prefix_id = f'{session_id}_' config_name = self.get_config_name(server_id) config_id = f'{prefix_id}{config_name}' # copy Config and all it's parents meta = orig_config.config.deepcopy(session_id=config_id, storage=config_storage, metaconfig_prefix=prefix_id) # retrieve the copied config (not metaconfig) config = meta while True: try: children = list(config.config.list()) if not children: # it's an empty metaconfig break config = children[0] except: # it's a config, so no "list" method break config.property.read_write() # set the default owner self.set_owner(config, username) # store it self.sessions[session_id] = {'config': config, # do not delete meta, so keep it! 'meta': meta, 'id': server_id, 'timestamp': int(time.time()), 'username': username} self.set_config_mode(session_id, 'normal') self.set_config_debug(session_id, False) self.set_namespace(session_id, 'creole') def set_config_mode(self, id: int, mode: str): """ Define which edition mode to select """ config = self.sessions[id]['config'] for mode_level in modes.values(): if modes[mode] < mode_level: config.property.add(mode_level.name) else: config.property.pop(mode_level.name) self.sessions[id]['mode'] = mode def set_config_debug(self, id_, is_debug): """ Enable/Disable debug mode """ config = self.sessions[id_]['config'] if is_debug: config.property.pop('hidden') else: config.property.add('hidden') self.sessions[id_]['debug'] = is_debug def set_namespace(self, session_id: int, namespace: str): self.sessions[session_id]['option'] = self.sessions[session_id]['config'].option(namespace) self.sessions[session_id]['namespace'] = namespace def get_sessions(self): return self.sessions; def get_session(self, session_id: int, username: str) -> Dict: if session_id not in self.sessions: raise Exception(f'the session {id} not exists') session = self.sessions[session_id] if username != session['username']: raise NotAllowedError() return session def del_session(self, id: int): del self.sessions[id] class StorageServer(Storage): def get_config_name(self, server_id: int): return f'std_{server_id}' def set_owner(self, config: Config, username: str): config.owner.set(username) class StorageServermodel(Storage): def get_config_name(self, server_id: int): return f'v_{server_id}' def set_owner(self, config: Config, username: str): config.owner.set('servermodel_' + username) storage_server = StorageServer() storage_servermodel = StorageServermodel()