from typing import Optional, Dict, List, Any import warnings import re from .tiramisu_json import TiramisuJson, _ from tiramisu.error import APIError, ValueWarning, ValueOptionError, ValueErrorWarning, PropertiesOptionError from tiramisu.setting import undefined class Option: def __init__(self, name, path): self.name = name self.path = path def __call__(self): return self def impl_get_display_name(self): return self.name def impl_getpath(self): return self.path class TiramisuOptionOption: def __init__(self, path: str, schema: Dict) -> None: self._path = path self.schema = schema def doc(self): return self.schema['title'] def path(self): return self._path def isoptiondescription(self): return self.schema['type'] in ['object', 'array'] def ismasterslaves(self): return self.schema['type'] == 'array' def issymlinkoption(self) -> bool: # FIXME return False def ismulti(self) -> bool: return self.schema.get('isMulti', False) def type(self) -> str: if self.ismasterslaves(): return 'masterslaves' if self.isoptiondescription(): return 'optiondescription' types = {'number': 'int', 'choice': 'choice', 'boolean': 'bool', 'password': 'password', 'date': 'date', 'domain': 'domainname', 'url': 'url', 'username': 'username', 'string': 'str'} if self.schema['type'] in types : return types[self.schema['type']] raise Exception('unsupported type {}'.format(self.schema['type'])) class TiramisuOptionProperty: def __init__(self, model: Dict) -> None: self.model = model def get(self): properties = self.model.get('properties', [])[:] # FIXME if slave: #if not isslave and childapi.option.ismulti(): # if 'empty' in props: # obj['required'] = True # props.remove('empty') # if 'mandatory' in props: # obj['needs_len'] = True # props.remove('mandatory') if self.model.get('required', False): properties.append('mandatory') if self.model.get('readOnly', False): properties.append('frozen') if self.model.get('hidden', False): properties.append('hidden') return properties class TiramisuOptionValue: def __init__(self, config: 'Config', schema: Dict, model: List[Dict], form: List[Dict], temp: List[Dict], path: str, index: int) -> None: self.config = config self.schema = schema self.model = model self.form = form self.temp = temp self.path = path self.index = index def _display_warnings(self, path, value, type, name): if self.model.get(path, {}).get('error'): for err in self.model.get(path, {}).get('error'): warnings.warn_explicit(ValueOptionError(value, type, Option(name, path), '{0}'.format(err)), ValueErrorWarning, self.__class__.__name__, 0) if self.model.get(path, {}).get('warnings'): for warn in self.model.get(path, {}).get('warnings'): warnings.warn_explicit(ValueWarning('{0}'.format(warn), Option(name, path)), ValueWarning, self.__class__.__name__, 0) def _dict_walk(self, ret: Dict, schema: Dict): for key, option in schema['properties'].items(): hidden = self.temp.get(self.path, {}).get('hidden', None) if hidden is False or (hidden is None and \ not self.model.get(self.path, {}).get('hidden', False) and \ self.model.get(self.path, {}).get('display', True)): if option['type'] == 'object': self._dict_walk(ret, option) else: value = self._get_value(key, schema) self._display_warnings(key, value, option['type'], option['name']) ret[key] = value def _get_value(self, path: str, schema: Dict) -> Any: if 'value' in self.temp.get(path, {}): value = self.temp[path]['value'] else: value = self.model.get(path, {}).get('value') if value is None and schema.get('isMulti', False): value = [] return value def dict(self, fullpath: bool=False, withwarning: bool=False): if not fullpath or not withwarning: raise NotImplementedError() ret = {} self._dict_walk(ret, self.schema) return ret def get(self) -> Any: value = self._get_value(self.path, self.schema) self._display_warnings(self.path, value, self.schema['type'], self.schema['name']) return value def list(self): return self.schema['enum'] def set(self, value): self.config.modify_value(self.path, self.index, value, self.form.get(self.path, {}).get('remote', False)) self._display_warnings(self.path, value, self.schema['type'], self.schema['name']) def reset(self): self.config.delete_value(self.path, self.index, self.form.get(self.path, {}).get('remote', False)) class TiramisuOption: def __init__(self, config: 'Config', schema: Dict, model: List[Dict], form: List[Dict], temp: List[Dict], path: str, index: Optional[int]) -> None: self.config = config self.schema = schema self.model = model self.form = form self.temp = temp self.path = path self.index = index def __getattr__(self, subfunc: str) -> Any: if subfunc == 'option': if self.index != None: raise NotImplementedError() return TiramisuOptionOption(self.path, self.schema) if subfunc == 'value': return TiramisuOptionValue(self.config, self.schema, self.model, self.form, self.temp, self.path, self.index) if subfunc == 'property': if self.index != None: raise NotImplementedError() return TiramisuOptionProperty(self.model.get(self.path, {})) raise APIError(_('please specify a valid sub function ({})').format(subfunc)) def group_type(self): hidden = self.temp.get(self.path, {}).get('hidden', None) if hidden is False or (hidden is None and \ not self.model.get(self.path, {}).get('hidden', False) and \ self.model.get(self.path, {}).get('display', True)): # FIXME return 'default' raise PropertiesOptionError(None, None, None, opt_type='optiondescription') def list(self, type='option'): if type != 'all': raise NotImplementedError() for path, schema in self.schema['properties'].items(): hidden = self.temp.get(path, {}).get('hidden', None) if self.temp.get(path, {}).get('hidden', False) is not True and \ not self.model.get(path, {}).get('hidden', False) and \ self.model.get(path, {}).get('display', True): yield TiramisuOption(self.config, schema, self.model, self.form, self.temp, path, self.index) class TiramisuContextProperty: # def __init__(self, # json): # self.json = json def get(self): # FIXME ? return ['demoting_error_warning'] class Config: def __init__(self, json): self.model_ori = json['model'] self.model = {option['key']: option for option in json['model']} self.form = {} for option in json['form']: if option.get('key'): if 'pattern' in option: option['pattern'] = re.compile(option['pattern']) self.form[option['key']] = option self.temp = {} self.schema = json['schema'] self.updates = [] def __getattr__(self, subfunc: str) -> Any: if subfunc == 'property': return TiramisuContextProperty() raise APIError(_('please specify a valid sub function ({})').format(subfunc)) def option(self, path: str, index: Optional[int]=None) -> TiramisuOption: first = next(iter(self.schema.keys())) if '.' in first: root_path = first.rsplit('.', 1)[0] len_root_path = len(root_path) len_path = len(path) if len_root_path >= len_path: spath = [] else: spath = path[len_root_path + 1:].split('.') schema = {'properties': self.schema} else: root_path, *spath = path.split('.') schema = self.schema[root_path]['properties'] fullsubpath = [root_path] for subpath in spath: fullsubpath.append(subpath) schema = schema['properties']['.'.join(fullsubpath)] return TiramisuOption(self, schema, self.model, self.form, self.temp, path, index) def add_value(self, path: str, index: Optional[int], value: Any, remote: bool) -> None: self.updates_value('add', path, index, value, remote, None) def modify_value(self, path: str, index: Optional[int], value: Any, remote: bool) -> None: schema = self.get_schema(path) if value and isinstance(value, list) and value[-1] is undefined: new_value = schema.get('defaultvalue') if new_value is None: len_value = len(value) if len(schema.get('value')) >= len_value: new_value = schema.get('value')[len_value - 1] value[-1] = new_value self.updates_value('modify', path, index, value, remote, None) def delete_value(self, path: str, index: Optional[int], remote: bool) -> None: schema = self.get_schema(path) value = schema['value']; if value is None and schema.get('isMulti', False): value = [] self.updates_value('delete', path, index, value, remote, None) def get_schema(self, path): first_path = next(iter(self.schema.keys())) if '.' in first_path: root = first_path.rsplit('.', 1)[0].split('.') else: root = [] s_path = path.split('.') schema = {'properties': self.schema} root_path = '.'.join(root) for subpath in path.split('.')[len(root):]: if root_path: root_path += '.' + subpath else: root_path = subpath schema = schema['properties'][root_path] return schema def updates_value(self, action: str, path: str, index: Optional[int], value: Optional[Any], remote: bool, masterslaves: Optional[str]) -> None: update_last_action = False if self.updates: last_body = self.updates[-1] if last_body['name'] == path: if index is None and not 'index' in last_body: last_action = last_body['action'] if last_action == action or \ last_action in ['delete', 'modify'] and action in ['delete', 'modify']: update_last_action = True elif index == None and action == 'delete': for update in reversed(self.updates): update['name'] if masterslaves == None and update['name'] == path or \ masterslaves and path.startswith(masterslaves + '.'): del self.updates[-1] else: break elif last_body['index'] == index: if last_body['action'] == 'add' and action == 'modify': action = 'add' update_last_action = True elif last_body['action'] == action and action != 'delete' or \ last_body['action'] == 'modify' and action == 'delete': update_last_action = True elif last_body['action'] == 'add' and action == 'delete': del this.updates[-1] if update_last_action: if value is None: if 'value' in last_body: del last_body['value'] else: last_body['value'] = value if index is None and 'index' in last_body: del last_body['index'] last_body['action'] = action else: data = {'action': action, 'name': path} if value is not None: data['value'] = value if index is not None: data['index'] = index self.updates.append(data) if 'pattern' in self.form.get(path, {}) and (not isinstance(value, list) or undefined not in value): match = self.test_value(path, value, remote) else: match = True if match: if remote: ret = self.send_data({'updates': self.updates, 'model': self.model_ori}) self.updates = [] self.temp.clear() # FIXME remove old key ? for model in ret['model']: self.model[model['key']] = model self.model_ori = ret['model'] else: self.temp.setdefault(path, {})['owner'] = 'tmp' self.temp[path]['value'] = value self.set_dependencies(path, action, value) self.set_not_equal(path, value) self.do_copy(path, value) def test_value(self, path: str, value: Any, remote: bool): if isinstance(value, list): for val in value: if not self.test_value(path, val, remote): return False return True else: if value is None: match = True else: match = self.form[path]['pattern'].search(value) if not remote: if not match: self.temp.setdefault(path, {})['error'] = [''] elif 'error' in self.model[path]: del self.temp[path]['error'] return match def set_dependencies(self, path: str, action: str, value: Any) -> None: dependencies = self.form.get(path, {}).get('dependencies', {}) if dependencies: if value in dependencies['expected']: expected = dependencies['expected'][value] else: expected = dependencies['default'] for action in ['hide', 'show']: expected_actions = expected.get(action) if expected_actions: for expected_path in expected_actions: self.temp[expected_path] = {'hidden': action == 'hide'} def set_not_equal(self, path: str, value: Any) -> None: not_equal = self.form.get(path, {}).get('not_equal', {}) if not_equal: vals = [] opts = [] if isinstance(value, list): for val in value: vals.append(val) opts.append(path) else: vals.append(value) opts.append(path) for path_ in self.form[path]['not_equal']['options']: schema = self.get_schema(path_) if not schema.get('isMulti', False): default_value = None else: default_value = [] p_value = self.model[path_].get('value', default_value) if isinstance(p_value, list): for val in p_value: vals.append(val) opts.append(path_) else: vals.append(p_value) opts.append(path_) equal = [] is_current = False warnings_only = self.form[path]['not_equal'].get('warnings', False) if warnings_only: msg = _('should be different from the value of "{}"') msgcurr = _('value for {} should be different') else: msg = _('must be different from the value of "{}"') msgcurr = _('value for {} must be different') for idx_inf, val_inf in enumerate(vals): for idx_sup, val_sup in enumerate(vals[idx_inf + 1:]): if val_inf == val_sup is not None: for opt_ in [opts[idx_inf], opts[idx_inf + idx_sup + 1]]: if opt_ == path: is_current = True if opt_ not in equal: equal.append(opt_) if equal: equal_name = {} for opt in equal: schema = self.get_schema(opt) equal_name[opt] = schema['title'] len_equal_name = len(equal) for opt_ in equal: display_equal = [] for opt__ in equal: if opt_ != opt__: display_equal.append(equal_name[opt_]) display_equal = ', '.join(display_equal) if opt_ == path: msg_ = msgcurr.format(display_equal) else: msg_ = msg.format(display_equal) if warnings_only: self.model[opt_].setdefault('warnings', []).append(msg_) else: self.model[opt_].setdefault('error', []).append(msg_) else: for opt in opts: if 'warnings' in self.model[opt]: del self.model[opt]['warnings'] if 'error' in self.model[opt]: del self.model[opt]['error'] def do_copy(self, path: str, value: Any) -> None: copy = self.form.get(path, {}).get('copy') if copy: for opt in copy: owner = self.temp.get(opt, {}).get('owner') if owner is None: owner = self.model[opt]['owner'] if owner == 'default': self.model[opt]['value'] = value def send_data(self, updates): raise NotImplementedError('please implement send_data function')