from os import urandom # , unlink from binascii import hexlify from traceback import print_exc from json import dumps from typing import Dict, List, Optional, Any from tiramisu import Storage from ...http import register as register_http from ...config import DEBUG from ...context import Context from ...utils import _ from ...error import CallError from .storage import storage_server, storage_servermodel from ...controller import Controller from ...register import register from ...dispatcher import dispatcher class Risotto(Controller): def __init__(self): self.modify_storage = Storage(engine='dictionary') def valid_user(self, session_id: str, risotto_context: Context, type: str) -> None: """ check if current user is the session owner """ if type == 'server': storage = storage_server else: storage = storage_servermodel username = risotto_context.username if username != storage.get_session(session_id)['username']: raise NotAllowedError() @register(['v1.session.server.start', 'v1.session.servermodel.start'], None) async def start_session(self, risotto_context: Context, id: int) -> Dict: """ start a new config session for a server or a servermodel """ type = risotto_context.message.rsplit('.', 2)[-2] config_module = dispatcher.get_service('config') if type == 'server': if id not in config_module.server: raise Exception(_(f'cannot find {type} with id {id}')) server = config_module.server[id] config = server['server'] storage = storage_server else: if id not in config_module.servermodel: raise Exception(_(f'cannot find {type} with id {id}')) config = config_module.servermodel[id] storage = storage_servermodel # check if a session already exists, in this case returns it session_list = self.list_sessions(type) for sess in session_list: if sess['id'] == id and sess['username'] == risotto_context.username: session_id = sess['session_id'] session = self.get_session(session_id, type) return self.format_session(session_id, session) # create a new session while True: session_id = 'z' + hexlify(urandom(23)).decode() if not storage.has_session(session_id): break else: print('session {} already exists'.format(session_id)) username = risotto_context.username storage.add_session(session_id, config, id, username, self.modify_storage) return self.get_session_informations(session_id, type) @register(['v1.session.server.list', 'v1.session.servermodel.list'], None) async def list_session_server(self, risotto_context: Context): type = risotto_context.message.rsplit('.', 2)[-2] return self.list_sessions(type) @register(['v1.session.server.filter', 'v1.session.servermodel.filter'], None) async def filter_session(self, risotto_context: Context, session_id: str, namespace: str, mode: str, debug: Optional[bool]): type = risotto_context.message.rsplit('.', 2)[-2] if type == 'server': storage = storage_server else: storage = storage_servermodel if namespace is not None: storage.set_namespace(session_id, namespace) if mode is not None: if mode not in ('basic', 'normal', 'expert'): raise CallError(f'unknown mode {mode}') storage.set_config_mode(session_id, mode) if debug is not None: storage.set_config_debug(session_id, debug) return self.get_session_informations(session_id, type) @register(['v1.session.server.configure', 'v1.session.servermodel.configure'], None) async def configure_session(self, risotto_context: Context, session_id: str, action: str, name: str, index: int, value: Any, value_multi: Optional[List]) -> Dict: type = risotto_context.message.rsplit('.', 2)[-2] session = self.get_session(session_id, type) ret = {'session_id': session_id, 'name': name} if index is not None: ret['index'] = index option = session['config'].option(name).option if option.ismulti() and not option.isfollower(): value = value_multi try: update = {'name': name, 'action': action, 'value': value} if index is not None: update['index'] = index updates = {'updates': [update]} session['option'].updates(updates) ret['status'] = 'ok' except Exception as err: if DEBUG: print_exc() ret['message'] = str(err) ret['status'] = 'error' return ret @register(['v1.session.server.validate', 'v1.session.servermodel.validate'], None) async def validate_session(self, risotto_context: Context, session_id: str) -> Dict: type = risotto_context.message.rsplit('.', 2)[-2] session = self.get_session(session_id, type) ret = {} try: session['config'].forcepermissive.option(session['namespace']).value.dict() except Exception as err: ret['status'] = 'error' ret['message'] = str(err) else: if type == 'server': mandatories = list(session['config'].forcepermissive.value.mandatory()) if mandatories: ret['status'] = 'incomplete' ret['mandatories'] = mandatories else: ret['status'] = 'ok' else: ret['status'] = 'ok' return ret @register(['v1.session.server.get', 'v1.session.servermodel.get'], None) async def get_session_(self, risotto_context: Context, session_id: str) -> Dict: type = risotto_context.message.rsplit('.', 2)[-2] info = self.get_session_informations(session_id, type) info['content'] = session_id session = self.get_session(session_id, type) info['content'] = dumps(session['option'].value.dict(fullpath=True)) return info @register(['v1.session.server.stop', 'v1.session.servermodel.stop'], None) async def stop_session(self, risotto_context: Context, session_id: str, save: bool) -> Dict: type = risotto_context.message.rsplit('.', 2)[-2] self.valid_user(session_id, risotto_context, type) session = self.get_session(session_id, type) id_ = session['id'] if type == 'server': storage = storage_server else: storage = storage_servermodel if save: storage.save_values(session_id) storage.del_session(session_id) return self.format_session(session_id, session) @register_http('v1', '/config/server/{session_id}') async def get_server_api(self, request, risotto_context: Context, session_id: str) -> Dict: self.valid_user(session_id, risotto_context, 'server') session = storage_server.get_session(session_id) return self.load_dict(session) @register_http('v1', '/config/servermodel/{session_id}') async def get_servermodel_api(self, request, risotto_context: Context, session_id: str) -> Dict: self.valid_user(session_id, risotto_context, 'servermodel') session = storage_servermodel.get_session(session_id) return self.load_dict(session) def get_session(self, session_id: str, type: str) -> Dict: """ Get session information from storage """ if type == 'server': return storage_server.get_session(session_id) return storage_servermodel.get_session(session_id) def get_session_informations(self, session_id: str, type: str) -> Dict: """ format session with a session ID name """ session = self.get_session(session_id, type) return self.format_session(session_id, session) def format_session(self, session_name: str, session: Dict) -> Dict: """ format session """ return {'session_id': session_name, 'id': session['id'], 'username': session['username'], 'timestamp': session['timestamp'], 'namespace': session['namespace'], 'mode': session['mode'], 'debug': session['debug']} def list_sessions(self, type: str) -> List: ret = [] if type == 'server': storage = storage_server else: storage = storage_servermodel for session_id, session in storage.get_sessions().items(): ret.append(self.format_session(session_id, session)) return ret def load_dict(self, session: Dict) -> Dict: return session['option'].dict(remotable='all')