From 3446a3828727a2b39853119dbe56c9f6e0341ebd Mon Sep 17 00:00:00 2001 From: Emmanuel Garette Date: Mon, 24 Dec 2018 09:28:44 +0100 Subject: [PATCH] add first version of tiramisu-json-api --- tiramisu_json_api/api.py | 577 +++++++++++++++++++++++++++++++++++++++ 1 file changed, 577 insertions(+) create mode 100644 tiramisu_json_api/api.py diff --git a/tiramisu_json_api/api.py b/tiramisu_json_api/api.py new file mode 100644 index 0000000..9842b3f --- /dev/null +++ b/tiramisu_json_api/api.py @@ -0,0 +1,577 @@ +from typing import Optional, Dict, List, Any +import warnings +import re + + +from .tiramisu_json import TiramisuJson, _ +from tiramisu.error import APIError, ValueWarning, ValueOptionError, ValueErrorWarning, PropertiesOptionError +from tiramisu.setting import undefined + + +class Option: + def __init__(self, + name, + path): + self.name = name + self.path = path + + def __call__(self): + return self + + def impl_get_display_name(self): + return self.name + + def impl_getpath(self): + return self.path + + +class TiramisuOptionOption: + def __init__(self, + path: str, + schema: Dict) -> None: + self._path = path + self.schema = schema + + def doc(self): + return self.schema['title'] + + def path(self): + return self._path + + def isoptiondescription(self): + return self.schema['type'] in ['object', 'array'] + + def ismasterslaves(self): + return self.schema['type'] == 'array' + + def issymlinkoption(self) -> bool: + # FIXME + return False + + def ismulti(self) -> bool: + return self.schema.get('isMulti', False) + + def type(self) -> str: + if self.ismasterslaves(): + return 'masterslaves' + if self.isoptiondescription(): + return 'optiondescription' + types = {'number': 'int', + 'choice': 'choice', + 'boolean': 'bool', + 'password': 'password', + 'date': 'date', + 'domain': 'domainname', + 'url': 'url', + 'username': 'username', + 'string': 'str'} + if self.schema['type'] in types : + return types[self.schema['type']] + raise Exception('unsupported type {}'.format(self.schema['type'])) + + +class TiramisuOptionProperty: + def __init__(self, + model: Dict) -> None: + self.model = model + + def get(self): + properties = self.model.get('properties', [])[:] + # FIXME if slave: + #if not isslave and childapi.option.ismulti(): + # if 'empty' in props: + # obj['required'] = True + # props.remove('empty') + # if 'mandatory' in props: + # obj['needs_len'] = True + # props.remove('mandatory') + if self.model.get('required', False): + properties.append('mandatory') + if self.model.get('readOnly', False): + properties.append('frozen') + if self.model.get('hidden', False): + properties.append('hidden') + return properties + + +class TiramisuOptionValue: + def __init__(self, + config: 'Config', + schema: Dict, + model: List[Dict], + form: List[Dict], + temp: List[Dict], + path: str, + index: int) -> None: + self.config = config + self.schema = schema + self.model = model + self.form = form + self.temp = temp + self.path = path + self.index = index + + def _display_warnings(self, path, value, type, name): + if self.model.get(path, {}).get('error'): + for err in self.model.get(path, {}).get('error'): + warnings.warn_explicit(ValueOptionError(value, + type, + Option(name, path), + '{0}'.format(err)), + ValueErrorWarning, + self.__class__.__name__, 0) + + if self.model.get(path, {}).get('warnings'): + for warn in self.model.get(path, {}).get('warnings'): + warnings.warn_explicit(ValueWarning('{0}'.format(warn), + Option(name, path)), + ValueWarning, + self.__class__.__name__, 0) + + def _dict_walk(self, + ret: Dict, + schema: Dict): + for key, option in schema['properties'].items(): + hidden = self.temp.get(self.path, {}).get('hidden', None) + if hidden is False or (hidden is None and \ + not self.model.get(self.path, {}).get('hidden', False) and \ + self.model.get(self.path, {}).get('display', True)): + if option['type'] == 'object': + self._dict_walk(ret, option) + else: + value = self._get_value(key, schema) + self._display_warnings(key, value, option['type'], option['name']) + ret[key] = value + + def _get_value(self, + path: str, + schema: Dict) -> Any: + if 'value' in self.temp.get(path, {}): + value = self.temp[path]['value'] + else: + value = self.model.get(path, {}).get('value') + if value is None and schema.get('isMulti', False): + value = [] + return value + + def dict(self, + fullpath: bool=False, + withwarning: bool=False): + if not fullpath or not withwarning: + raise NotImplementedError() + ret = {} + self._dict_walk(ret, self.schema) + return ret + + def get(self) -> Any: + value = self._get_value(self.path, self.schema) + self._display_warnings(self.path, value, self.schema['type'], self.schema['name']) + return value + + def list(self): + return self.schema['enum'] + + def set(self, value): + self.config.modify_value(self.path, + self.index, + value, + self.form.get(self.path, {}).get('remote', False)) + self._display_warnings(self.path, value, self.schema['type'], self.schema['name']) + + def reset(self): + self.config.delete_value(self.path, + self.index, + self.form.get(self.path, {}).get('remote', False)) + + +class TiramisuOption: + def __init__(self, + config: 'Config', + schema: Dict, + model: List[Dict], + form: List[Dict], + temp: List[Dict], + path: str, + index: Optional[int]) -> None: + self.config = config + self.schema = schema + self.model = model + self.form = form + self.temp = temp + self.path = path + self.index = index + + def __getattr__(self, + subfunc: str) -> Any: + if subfunc == 'option': + if self.index != None: + raise NotImplementedError() + return TiramisuOptionOption(self.path, + self.schema) + if subfunc == 'value': + return TiramisuOptionValue(self.config, + self.schema, + self.model, + self.form, + self.temp, + self.path, + self.index) + if subfunc == 'property': + if self.index != None: + raise NotImplementedError() + return TiramisuOptionProperty(self.model.get(self.path, {})) + raise APIError(_('please specify a valid sub function ({})').format(subfunc)) + + def group_type(self): + hidden = self.temp.get(self.path, {}).get('hidden', None) + if hidden is False or (hidden is None and \ + not self.model.get(self.path, {}).get('hidden', False) and \ + self.model.get(self.path, {}).get('display', True)): + # FIXME + return 'default' + raise PropertiesOptionError(None, None, None, opt_type='optiondescription') + + def list(self, + type='option'): + if type != 'all': + raise NotImplementedError() + for path, schema in self.schema['properties'].items(): + hidden = self.temp.get(path, {}).get('hidden', None) + if self.temp.get(path, {}).get('hidden', False) is not True and \ + not self.model.get(path, {}).get('hidden', False) and \ + self.model.get(path, {}).get('display', True): + yield TiramisuOption(self.config, + schema, + self.model, + self.form, + self.temp, + path, + self.index) + + +class TiramisuContextProperty: + # def __init__(self, + # json): + # self.json = json + + def get(self): + # FIXME ? + return ['demoting_error_warning'] + + +class Config: + def __init__(self, + json): + self.model_ori = json['model'] + self.model = {option['key']: option for option in json['model']} + self.form = {} + for option in json['form']: + if option.get('key'): + if 'pattern' in option: + option['pattern'] = re.compile(option['pattern']) + self.form[option['key']] = option + self.temp = {} + self.schema = json['schema'] + self.updates = [] + + def __getattr__(self, + subfunc: str) -> Any: + if subfunc == 'property': + return TiramisuContextProperty() + raise APIError(_('please specify a valid sub function ({})').format(subfunc)) + + def option(self, + path: str, + index: Optional[int]=None) -> TiramisuOption: + + first = next(iter(self.schema.keys())) + if '.' in first: + root_path = first.rsplit('.', 1)[0] + len_root_path = len(root_path) + len_path = len(path) + if len_root_path >= len_path: + spath = [] + else: + spath = path[len_root_path + 1:].split('.') + schema = {'properties': self.schema} + else: + root_path, *spath = path.split('.') + schema = self.schema[root_path]['properties'] + fullsubpath = [root_path] + for subpath in spath: + fullsubpath.append(subpath) + schema = schema['properties']['.'.join(fullsubpath)] + + return TiramisuOption(self, + schema, + self.model, + self.form, + self.temp, + path, + index) + + def add_value(self, + path: str, + index: Optional[int], + value: Any, + remote: bool) -> None: + self.updates_value('add', + path, + index, + value, + remote, + None) + + def modify_value(self, + path: str, + index: Optional[int], + value: Any, + remote: bool) -> None: + schema = self.get_schema(path) + if value and isinstance(value, list) and value[-1] is undefined: + new_value = schema.get('defaultvalue') + if new_value is None: + len_value = len(value) + if len(schema.get('value')) >= len_value: + new_value = schema.get('value')[len_value - 1] + value[-1] = new_value + + self.updates_value('modify', + path, + index, + value, + remote, + None) + + def delete_value(self, + path: str, + index: Optional[int], + remote: bool) -> None: + schema = self.get_schema(path) + value = schema['value']; + if value is None and schema.get('isMulti', False): + value = [] + self.updates_value('delete', + path, + index, + value, + remote, + None) + + def get_schema(self, + path): + first_path = next(iter(self.schema.keys())) + if '.' in first_path: + root = first_path.rsplit('.', 1)[0].split('.') + else: + root = [] + + s_path = path.split('.') + schema = {'properties': self.schema} + root_path = '.'.join(root) + for subpath in path.split('.')[len(root):]: + if root_path: + root_path += '.' + subpath + else: + root_path = subpath + schema = schema['properties'][root_path] + return schema + + + + def updates_value(self, + action: str, + path: str, + index: Optional[int], + value: Optional[Any], + remote: bool, + masterslaves: Optional[str]) -> None: + update_last_action = False + if self.updates: + last_body = self.updates[-1] + if last_body['name'] == path: + if index is None and not 'index' in last_body: + last_action = last_body['action'] + if last_action == action or \ + last_action in ['delete', 'modify'] and action in ['delete', 'modify']: + update_last_action = True + elif index == None and action == 'delete': + for update in reversed(self.updates): + update['name'] + if masterslaves == None and update['name'] == path or \ + masterslaves and path.startswith(masterslaves + '.'): + del self.updates[-1] + else: + break + elif last_body['index'] == index: + if last_body['action'] == 'add' and action == 'modify': + action = 'add' + update_last_action = True + elif last_body['action'] == action and action != 'delete' or \ + last_body['action'] == 'modify' and action == 'delete': + update_last_action = True + elif last_body['action'] == 'add' and action == 'delete': + del this.updates[-1] + + if update_last_action: + if value is None: + if 'value' in last_body: + del last_body['value'] + else: + last_body['value'] = value + if index is None and 'index' in last_body: + del last_body['index'] + last_body['action'] = action + else: + data = {'action': action, + 'name': path} + if value is not None: + data['value'] = value + if index is not None: + data['index'] = index + self.updates.append(data) + if 'pattern' in self.form.get(path, {}) and (not isinstance(value, list) or undefined not in value): + match = self.test_value(path, value, remote) + else: + match = True + if match: + if remote: + ret = self.send_data({'updates': self.updates, + 'model': self.model_ori}) + self.updates = [] + self.temp.clear() + # FIXME remove old key ? + for model in ret['model']: + self.model[model['key']] = model + self.model_ori = ret['model'] + else: + self.temp.setdefault(path, {})['owner'] = 'tmp' + self.temp[path]['value'] = value + self.set_dependencies(path, action, value) + self.set_not_equal(path, value) + self.do_copy(path, value) + + def test_value(self, + path: str, + value: Any, + remote: bool): + if isinstance(value, list): + for val in value: + if not self.test_value(path, val, remote): + return False + return True + else: + if value is None: + match = True + else: + match = self.form[path]['pattern'].search(value) + if not remote: + if not match: + self.temp.setdefault(path, {})['error'] = [''] + elif 'error' in self.model[path]: + del self.temp[path]['error'] + return match + + def set_dependencies(self, + path: str, + action: str, + value: Any) -> None: + dependencies = self.form.get(path, {}).get('dependencies', {}) + if dependencies: + if value in dependencies['expected']: + expected = dependencies['expected'][value] + else: + expected = dependencies['default'] + for action in ['hide', 'show']: + expected_actions = expected.get(action) + if expected_actions: + for expected_path in expected_actions: + self.temp[expected_path] = {'hidden': action == 'hide'} + + def set_not_equal(self, + path: str, + value: Any) -> None: + not_equal = self.form.get(path, {}).get('not_equal', {}) + if not_equal: + vals = [] + opts = [] + if isinstance(value, list): + for val in value: + vals.append(val) + opts.append(path) + else: + vals.append(value) + opts.append(path) + for path_ in self.form[path]['not_equal']['options']: + schema = self.get_schema(path_) + if not schema.get('isMulti', False): + default_value = None + else: + default_value = [] + p_value = self.model[path_].get('value', default_value) + if isinstance(p_value, list): + for val in p_value: + vals.append(val) + opts.append(path_) + else: + vals.append(p_value) + opts.append(path_) + equal = [] + is_current = False + warnings_only = self.form[path]['not_equal'].get('warnings', False) + if warnings_only: + msg = _('should be different from the value of "{}"') + msgcurr = _('value for {} should be different') + else: + msg = _('must be different from the value of "{}"') + msgcurr = _('value for {} must be different') + for idx_inf, val_inf in enumerate(vals): + for idx_sup, val_sup in enumerate(vals[idx_inf + 1:]): + if val_inf == val_sup is not None: + for opt_ in [opts[idx_inf], opts[idx_inf + idx_sup + 1]]: + if opt_ == path: + is_current = True + if opt_ not in equal: + equal.append(opt_) + if equal: + equal_name = {} + for opt in equal: + schema = self.get_schema(opt) + equal_name[opt] = schema['title'] + len_equal_name = len(equal) + for opt_ in equal: + display_equal = [] + for opt__ in equal: + if opt_ != opt__: + display_equal.append(equal_name[opt_]) + display_equal = ', '.join(display_equal) + if opt_ == path: + msg_ = msgcurr.format(display_equal) + else: + msg_ = msg.format(display_equal) + if warnings_only: + self.model[opt_].setdefault('warnings', []).append(msg_) + else: + self.model[opt_].setdefault('error', []).append(msg_) + else: + for opt in opts: + if 'warnings' in self.model[opt]: + del self.model[opt]['warnings'] + if 'error' in self.model[opt]: + del self.model[opt]['error'] + + def do_copy(self, + path: str, + value: Any) -> None: + copy = self.form.get(path, {}).get('copy') + if copy: + for opt in copy: + owner = self.temp.get(opt, {}).get('owner') + if owner is None: + owner = self.model[opt]['owner'] + if owner == 'default': + self.model[opt]['value'] = value + + def send_data(self, + updates): + raise NotImplementedError('please implement send_data function')