from typing import Optional, Dict, List, Any import warnings import re from .error import APIError, ValueWarning, ValueOptionError, ValueErrorWarning, PropertiesOptionError from .setting import undefined from .i18n import _ TYPE = {'boolean': bool, 'number': int, 'string': str, 'password': str, 'domain': str} class Option: # fake Option (IntOption, StrOption, ...) # only usefull for warnings def __init__(self, path): self.path = path def __call__(self): # suppose to be a weakref return self def impl_getpath(self): return self.path class TiramisuOptionOption: # config.option(path).option def __init__(self, config: 'Config', path: str, schema: Dict, model: Dict, form: Dict) -> None: self.config = config self._path = path self.schema = schema self.model = model self.form = form def doc(self): return self.schema['title'] def path(self): return self._path def name(self, follow_symlink: bool=False) -> str: if not follow_symlink or \ self.isoptiondescription() or \ not self.issymlinkoption(): path = self._path else: path = self.schema['opt_path'] return path.rsplit('.', 1)[-1] def isoptiondescription(self): return self.schema['type'] in ['object', 'array'] def ismasterslaves(self): return self.schema['type'] == 'array' def issymlinkoption(self) -> bool: return self.schema['type'] == 'symlink' def ismulti(self) -> bool: return self.schema.get('isMulti', False) def type(self) -> str: if self.ismasterslaves(): return 'masterslaves' if self.isoptiondescription(): return 'optiondescription' types = {'number': 'int', 'choice': 'choice', 'boolean': 'bool', 'password': 'password', 'date': 'date', 'domain': 'domainname', 'url': 'url', 'username': 'username', 'string': 'str', 'symlink': 'symlink'} if self.schema['type'] in types : return types[self.schema['type']] raise Exception('unsupported type {}'.format(self.schema['type'])) def properties(self) -> List[str]: props = [] model = self.model.get(self._path, {}) if model.get('required'): #FIXME 'empty', 'needs_len' props.append('mandatory') if model.get('readOnly'): props.append('frozen') if self.config.get_hidden(self._path): props.append('hidden') if self.form.get(self._path, {}).get('clearable'): props.append('clearable') return props def requires(self) -> None: # FIXME return None class TiramisuOptionProperty: # config.option(path).property def __init__(self, self.config, path: str, model: Dict) -> None: self.path = path self.model = model def get(self, only_raises=False): # FIXME if slave: #if not isslave and childapi.option.ismulti(): # if 'empty' in props: # obj['required'] = True # props.remove('empty') # if 'mandatory' in props: # obj['needs_len'] = True # props.remove('mandatory') if not only_raises: properties = self.model.get('properties', [])[:] if self.model.get('required', False): properties.append('mandatory') if self.model.get('readOnly', False): properties.append('frozen') else: properties = [] if self.config.get_hidden(self.path): properties.append('hidden') return properties class _Value: def _dict_walk(self, ret: Dict, schema: Dict, root: str, fullpath: bool, withwarning: bool): for key, option in schema['properties'].items(): hidden = self.temp.get(key, {}).get('hidden', None) model_hidden = not self.model.get(key, {}).get('hidden', False) and \ self.model.get(key, {}).get('display', True) if hidden is False or (hidden is None and model_hidden): if option['type'] == 'object': self._dict_walk(ret, option, root, fullpath, withwarning) else: value = self.config.get_value(key, schema) self._display_warnings(key, value, option['type'], option['name'], withwarning) ret[key] = value def dict(self, fullpath: bool=False, withwarning: bool=False): ret = {} self._dict_walk(ret, self.schema, self.path, fullpath, withwarning) return ret def _display_warnings(self, path, value, type, name, withwarning=True): for err in self.model.get(path, {}).get('error', []): warnings.warn_explicit(ValueOptionError(value, type, Option(path), '{0}'.format(err)), ValueErrorWarning, self.__class__.__name__, 0) if withwarning and self.model.get(path, {}).get('warnings'): for warn in self.model.get(path, {}).get('warnings'): # FIXME ValueWarning needs value and type attribute! warnings.warn_explicit(ValueWarning('{0}'.format(warn), Option(path)), ValueWarning, self.__class__.__name__, 0) class TiramisuOptionOwner: # config.option(path).owner def __init__(self, config: 'Config', schema: Dict, model: List[Dict], form: List[Dict], temp: List[Dict], path: str, index: int) -> None: self.config = config self.schema = schema self.model = model self.form = form self.temp = temp self.path = path self.index = index def isdefault(self) -> Any: return self.config.get_owner(self.path) == 'default' class TiramisuOptionValue(_Value): # config.option(path).value def __init__(self, config: 'Config', schema: Dict, model: List[Dict], form: List[Dict], temp: List[Dict], path: str, index: int) -> None: self.config = config self.schema = schema self.model = model self.form = form self.temp = temp self.path = path self.index = index def get(self) -> Any: value = self.config.get_value(self.path, self.schema) self._display_warnings(self.path, value, self.schema['type'], self.schema['name']) return value def list(self): return self.schema['enum'] def _validate(self, type_, value): if value in [None, undefined]: return if type_ == 'choice': if value not in self.schema['enum']: raise Exception('value {} is not in {}'.format(value, self.schema['enum'])) elif not isinstance(value, TYPE[type_]): raise Exception('value {} is not a valid {} '.format(value, type_)) def set(self, value): type_ = self.schema['type'] if self.schema.get('isMulti', False): if not isinstance(value, list): raise Exception('value must be a list') for val in value: self._validate(type_, val) else: self._validate(type_, value) remote = self.form.get(self.path, {}).get('remote', False) self.config.modify_value(self.path, self.index, value, remote) self._display_warnings(self.path, value, type_, self.schema['name']) def reset(self): remote = self.form.get(self.path, {}).get('remote', False) self.config.delete_value(self.path, self.index, remote) class _Option: def list(self, type='option'): if type not in ['all', 'option']: raise NotImplementedError() for path, schema in self.schema['properties'].items(): if type == 'all' or schema['type'] not in ['object', 'array']: hidden = self.temp.get(path, {}).get('hidden', None) model_hidden = not self.model.get(path, {}).get('hidden', False) and \ self.model.get(path, {}).get('display', True) if hidden is False or (hidden is None and model_hidden is True): if schema['type'] in ['object', 'array']: yield TiramisuOptionDescription(self.config, schema, self.model, self.form, self.temp, path) else: yield TiramisuOption(self.config, schema, self.model, self.form, self.temp, path, self.index) class TiramisuOptionDescription(_Option): # config.option(path) (with path == OptionDescription) def __init__(self, config: 'Config', schema: Dict, model: List[Dict], form: List[Dict], temp: List[Dict], path: str) -> None: self.config = config self.schema = schema self.model = model self.form = form self.temp = temp self.path = path self.index = None def __getattr__(self, subfunc: str) -> Any: if subfunc == 'option': return TiramisuOptionOption(self.config, self.path, self.schema, self.model, self.form) if subfunc == 'property': return TiramisuOptionProperty(self.path, self.config, self.model.get(self.path, {})) if subfunc == 'value': return TiramisuOptionValue(self.config, self.schema, self.model, self.form, self.temp, self.path, self.index) raise APIError(_('please specify a valid sub function ({})').format(subfunc)) def group_type(self): hidden = self.temp.get(self.path, {}).get('hidden', None) model_hidden = not self.model.get(self.path, {}).get('hidden', False) and \ self.model.get(self.path, {}).get('display', True) if hidden is False or (hidden is None and model_hidden): # FIXME return 'default' raise PropertiesOptionError(None, None, None, opt_type='optiondescription') class TiramisuOption: # config.option(path) (with path == Option) def __init__(self, config: 'Config', schema: Dict, model: List[Dict], form: List[Dict], temp: List[Dict], path: str, index: Optional[int]) -> None: self.config = config self.schema = schema self.model = model self.form = form self.temp = temp self.path = path self.index = index def __getattr__(self, subfunc: str) -> Any: if subfunc == 'option': if self.index != None: raise NotImplementedError() return TiramisuOptionOption(self.config, self.path, self.schema, self.model, self.form) if subfunc == 'value': return TiramisuOptionValue(self.config, self.schema, self.model, self.form, self.temp, self.path, self.index) if subfunc == 'owner': return TiramisuOptionOwner(self.config, self.schema, self.model, self.form, self.temp, self.path, self.index) if subfunc == 'property': if self.index != None: raise NotImplementedError() return TiramisuOptionProperty(self.path, self.config, self.model.get(self.path, {})) raise APIError(_('please specify a valid sub function ({})').format(subfunc)) class TiramisuContextProperty: # config.property # def __init__(self, # json): # self.json = json def get(self): # FIXME ? return ['demoting_error_warning'] class ContextOption(_Option): # config.option def __init__(self, config: 'Config', model: Dict, form: Dict, schema: Dict, temp: Dict) -> None: self.config = config self.model = model self.form = form self.schema = {'properties': schema} self.temp = temp self.index = None def __call__(self, path: str, index: Optional[int]=None) -> TiramisuOption: schema = self.config.get_schema(path) if schema['type'] in ['object', 'array']: return TiramisuOptionDescription(self.config, schema, self.model, self.form, self.temp, path) return TiramisuOption(self.config, schema, self.model, self.form, self.temp, path, index) class ContextValue(_Value): # config.value def __init__(self, config: 'Config', model: Dict, form: Dict, schema: Dict, temp: Dict) -> None: self.config = config self.model = model self.form = form first = next(iter(schema.keys())) self.path = first.rsplit('.', 1)[0] self.schema = {'properties': schema} self.temp = temp def __call__(self) -> TiramisuOptionValue: return TiramisuOptionValue(self.config, self.schema, self.model, self.form, self.temp, path, index) def mandatory(self): for key, value in self.dict().items(): if self.model.get(key, {}).get('required') and \ value is None or \ (self.schema.get('isMulti') and (None in value or '' in value)): yield key class Config: # config def __init__(self, json): self.model_ori = json['model'] self.model = {option['key']: option for option in json['model']} self.form = {} for option in json['form']: if 'key' in option: if 'pattern' in option: option['pattern'] = re.compile(option['pattern']) self.form[option['key']] = option self.temp = {} self.schema = json['schema'] self.updates = [] first_path = next(iter(self.schema.keys())) if '.' in first_path: self.root = first_path.rsplit('.', 1)[0] else: self.root = '' def __getattr__(self, subfunc: str) -> Any: if subfunc == 'property': return TiramisuContextProperty() if subfunc == 'option': return ContextOption(self, self.model, self.form, self.schema, self.temp) if subfunc == 'value': return ContextValue(self, self.model, self.form, self.schema, self.temp) raise APIError(_('please specify a valid sub function ({})').format(subfunc)) def add_value(self, path: str, index: Optional[int], value: Any, remote: bool) -> None: self.updates_value('add', path, index, value, remote, None) def modify_value(self, path: str, index: Optional[int], value: Any, remote: bool) -> None: schema = self.get_schema(path) if value and isinstance(value, list) and value[-1] is undefined: new_value = schema.get('defaultvalue') if new_value is None: len_value = len(value) schema_value = schema.get('value', []) if len(schema_value) >= len_value: new_value = schema_value[len_value - 1] value[-1] = new_value self.updates_value('modify', path, index, value, remote, None) def delete_value(self, path: str, index: Optional[int], remote: bool) -> None: schema = self.get_schema(path) value = schema.get('value'); if value is None and schema.get('isMulti', False): value = [] self.updates_value('delete', path, index, value, remote, None) def get_schema(self, path): root_path = self.root schema = {'properties': self.schema, 'type': 'object'} if root_path: root = self.root.split('.') if not path.startswith(root): raise Exception('cannot find {0}'.format(path)) subpaths = path.split('.')[len(root):] else: subpaths = path.split('.') for subpath in subpaths: if root_path: root_path += '.' + subpath else: root_path = subpath schema = schema['properties'][root_path] return schema def get_hidden(self, path: str) -> bool: property_ = 'hidden' if property_ in self.temp.get(path, {}): value = self.temp[path][property_] else: value = self.model.get(path, {}).get(property_) return value def get_value(self, path: str, schema: Dict) -> Any: if 'value' in self.temp.get(path, {}): value = self.temp[path]['value'] else: value = self.model.get(path, {}).get('value') if value is None and schema.get('isMulti', False): value = [] return value def get_owner(self, path: str) -> str: if 'owner' in self.temp.get(path, {}): owner = self.temp[path]['owner'] else: owner = self.model.get(path, {}).get('owner', 'default') return owner def updates_value(self, action: str, path: str, index: Optional[int], value: Optional[Any], remote: bool, masterslaves: Optional[str]) -> None: update_last_action = False if self.updates: last_body = self.updates[-1] if last_body['name'] == path: if index is None and not 'index' in last_body: last_action = last_body['action'] if last_action == action or \ last_action in ['delete', 'modify'] and action in ['delete', 'modify']: update_last_action = True elif index == None and action == 'delete': for update in reversed(self.updates): if masterslaves == None and update['name'] == path or \ masterslaves and path.startswith(masterslaves + '.'): del self.updates[-1] else: break elif last_body['index'] == index: if last_body['action'] == 'add' and action == 'modify': action = 'add' update_last_action = True elif last_body['action'] == action and action != 'delete' or \ last_body['action'] == 'modify' and action == 'delete': update_last_action = True elif last_body['action'] == 'add' and action == 'delete': del self.updates[-1] if update_last_action: if value is None: if 'value' in last_body: del last_body['value'] else: last_body['value'] = value if index is None and 'index' in last_body: del last_body['index'] last_body['action'] = action else: data = {'action': action, 'name': path} if value is not None: data['value'] = value if index is not None: data['index'] = index self.updates.append(data) if 'pattern' in self.form.get(path, {}) and (not isinstance(value, list) or undefined not in value): match = self.test_value(path, value, remote) else: match = True if match: if remote: ret = self.send_data({'updates': self.updates, 'model': self.model_ori}) self.updates = [] self.temp.clear() # FIXME remove old key ? for model in ret['model']: self.model[model['key']] = model self.model_ori = ret['model'] else: self.temp.setdefault(path, {})['owner'] = 'tmp' self.temp[path]['value'] = value self.set_dependencies(path, value) self.set_not_equal(path, value) self.do_copy(path, value) def test_value(self, path: str, value: Any, remote: bool): if isinstance(value, list): for val in value: if not self.test_value(path, val, remote): return False return True else: if value is None: match = True else: match = self.form[path]['pattern'].search(value) if not remote: if not match: self.temp.setdefault(path, {})['error'] = [''] elif 'error' in self.model[path]: del self.temp[path]['error'] return match def set_dependencies(self, path: str, ori_value: Any, force_hide: bool=False) -> None: dependencies = self.form.get(path, {}).get('dependencies', {}) if dependencies: if ori_value in dependencies['expected']: expected = dependencies['expected'][ori_value] else: expected = dependencies['default'] for action in ['hide', 'show']: expected_actions = expected.get(action) if expected_actions: if force_hide: hidden = True else: hidden = action == 'hide' for expected_path in expected_actions: self.temp.setdefault(expected_path, {})['hidden'] = hidden if 'value' in self.temp[expected_path]: value = self.temp[expected_path]['value'] else: value = self.model[expected_path].get('value') self.set_dependencies(expected_path, value, hidden) def set_not_equal(self, path: str, value: Any) -> None: not_equal = self.form.get(path, {}).get('not_equal', {}) if not_equal: vals = [] opts = [] if isinstance(value, list): for val in value: vals.append(val) opts.append(path) else: vals.append(value) opts.append(path) for path_ in self.form[path]['not_equal']['options']: schema = self.get_schema(path_) p_value = self.get_value(path_, schema) if isinstance(p_value, list): for val in p_value: vals.append(val) opts.append(path_) else: vals.append(p_value) opts.append(path_) equal = [] warnings_only = self.form[path]['not_equal'].get('warnings', False) if warnings_only: msg = _('should be different from the value of "{}"') msgcurr = _('value for {} should be different') else: msg = _('must be different from the value of "{}"') msgcurr = _('value for {} must be different') for idx_inf, val_inf in enumerate(vals): for idx_sup, val_sup in enumerate(vals[idx_inf + 1:]): if val_inf == val_sup is not None: for opt_ in [opts[idx_inf], opts[idx_inf + idx_sup + 1]]: if opt_ not in equal: equal.append(opt_) if equal: equal_name = {} for opt in equal: schema = self.get_schema(opt) equal_name[opt] = schema['title'] for opt_ in equal: display_equal = [] for opt__ in equal: if opt_ != opt__: display_equal.append(equal_name[opt_]) display_equal = ', '.join(display_equal) if opt_ == path: msg_ = msgcurr.format(display_equal) else: msg_ = msg.format(display_equal) if warnings_only: self.model[opt_].setdefault('warnings', []).append(msg_) else: self.model[opt_].setdefault('error', []).append(msg_) else: for opt in opts: if 'warnings' in self.model[opt]: del self.model[opt]['warnings'] if 'error' in self.model[opt]: del self.model[opt]['error'] def do_copy(self, path: str, value: Any) -> None: copy = self.form.get(path, {}).get('copy') if copy: for opt in copy: owner = self.get_owner(opt) if owner == 'default': # do not change in this.temp, it's default value self.model[opt]['value'] = value def send_data(self, updates): raise NotImplementedError('please implement send_data function')