from typing import Optional, Dict, List, Any from copy import copy import warnings import re from .error import APIError, ValueWarning, ValueOptionError, ValueErrorWarning, PropertiesOptionError from .setting import undefined from .i18n import _ TYPE = {'boolean': bool, 'integer': int, 'string': str, 'password': str, 'domain': str} class Option: # fake Option (IntOption, StrOption, ...) # only usefull for warnings def __init__(self, path): self.path = path def __call__(self): # suppose to be a weakref return self def impl_getpath(self): return self.path class TiramisuOptionOption: # config.option(path).option def __init__(self, config: 'Config', path: str, schema: Dict, model: Dict, form: Dict) -> None: self.config = config self._path = path self.schema = schema self.model = model self.form = form def doc(self): return self.schema['title'] def path(self): return self._path def name(self, follow_symlink: bool=False) -> str: if not follow_symlink or \ self.isoptiondescription() or \ not self.issymlinkoption(): path = self._path else: path = self.schema['opt_path'] return path.rsplit('.', 1)[-1] def isoptiondescription(self): return self.schema['type'] in ['object', 'array'] def isleadership(self): return self.schema['type'] == 'array' def isleader(self): if '.' in self._path: parent_schema = self.config.get_schema(self._path.rsplit('.', 1)[0]) if parent_schema['type'] == 'array': leader = next(iter(parent_schema['properties'].keys())) return leader == self._path return False def isfollower(self): return self.config.isfollower(self._path) def issymlinkoption(self) -> bool: return self.schema['type'] == 'symlink' def ismulti(self) -> bool: return self.schema.get('isMulti', False) def type(self) -> str: if self.isleadership(): return 'leadership' if self.isoptiondescription(): return 'optiondescription' return self.schema['type'] def properties(self) -> List[str]: model = self.model.get(self._path, {}) return self.config.get_properties(self.model, self._path, None) def requires(self) -> None: # FIXME return None class TiramisuOptionProperty: # config.option(path).property def __init__(self, config: 'Config', path: str, index: Optional[int], model: Dict) -> None: self.config = config self.path = path self.index = index self.model = model def get(self, only_raises=False): if not only_raises: props = self.config.get_properties(self.model, self.path, self.index, only_raises) else: props = [] if self.config.get_hidden(self.path, self.index): props.append('hidden') return props class _Value: def _dict_walk(self, ret: Dict, schema: Dict, root: str, fullpath: bool, withwarning: bool) -> None: leadership_len = None for key, option in schema['properties'].items(): hidden = self.temp.get(key, {}).get('hidden', None) model_display = not self.model.get(key, {}).get('hidden', False) and \ self.model.get(key, {}).get('display', True) if hidden is False or (hidden is None and model_display is True): if option['type'] in ['object', 'array']: # optiondescription or leadership self._dict_walk(ret, option, root, fullpath, withwarning) elif schema.get('type') == 'array' and leadership_len is not None: # followers values = [] for index in range(leadership_len): value = self.config.get_value(key, index) self._display_warnings(key, value, option['type'], option['name'], withwarning) values.append(value) ret[key] = values else: value = self.config.get_value(key) self._display_warnings(key, value, option['type'], option['name'], withwarning) ret[key] = value if schema.get('type') == 'array': leadership_len = len(value) elif schema.get('type') == 'array' and leadership_len is None: # if leader is hidden, followers are hidden too break def dict(self, fullpath: bool=False, withwarning: bool=False): ret = {} self._dict_walk(ret, self.schema, self.path, fullpath, withwarning) return ret def _display_warnings(self, path, value, type, name, withwarning=True): for err in self.model.get(path, {}).get('error', []): warnings.warn_explicit(ValueErrorWarning(value, type, Option(path), '{0}'.format(err)), ValueErrorWarning, self.__class__.__name__, 0) if withwarning and self.model.get(path, {}).get('warnings'): for warn in self.model.get(path, {}).get('warnings'): warnings.warn_explicit(ValueErrorWarning(value, type, Option(path), '{0}'.format(err)), ValueErrorWarning, self.__class__.__name__, 0) class TiramisuOptionOwner: # config.option(path).owner def __init__(self, config: 'Config', schema: Dict, model: List[Dict], form: List[Dict], temp: List[Dict], path: str, index: int) -> None: self.config = config self.schema = schema self.model = model self.form = form self.temp = temp self.path = path self.index = index def isdefault(self) -> Any: return self.config.get_owner(self.path, self.index) == 'default' def get(self) -> str: return self.config.get_owner(self.path, self.index) class TiramisuOptionValue(_Value): # config.option(path).value def __init__(self, config: 'Config', schema: Dict, model: List[Dict], form: List[Dict], temp: List[Dict], path: str, index: int) -> None: self.config = config self.schema = schema self.model = model self.form = form self.temp = temp self.path = path self.index = index def get(self) -> Any: if self.config.isfollower(self.path): if self.index is None: raise APIError(_('index must be set with the follower option "{}"').format(self.path)) value = self.config.get_value(self.path, self.index) self._display_warnings(self.path, value, self.schema['type'], self.schema['name']) return value if self.index is not None: raise APIError(_('index must only be set with a follower option, not for "{}"').format(self.path)) value = self.config.get_value(self.path) self._display_warnings(self.path, value, self.schema['type'], self.schema['name']) return value def list(self): return self.schema['enum'] def _validate(self, type_, value): if value in [None, undefined]: return if type_ == 'choice': if value not in self.schema['enum']: raise Exception('value {} is not in {}'.format(value, self.schema['enum'])) elif not isinstance(value, TYPE[type_]): raise Exception('value {} is not a valid {} '.format(value, type_)) def set(self, value): type_ = self.schema['type'] remote = self.form.get(self.path, {}).get('remote', False) if self.index is None and self.schema.get('isMulti', False): if not isinstance(value, list): raise Exception('value must be a list') for val in value: self._validate(type_, val) else: self._validate(type_, value) self.config.modify_value(self.path, self.index, value, remote) self._display_warnings(self.path, value, type_, self.schema['name']) def reset(self): remote = self.form.get(self.path, {}).get('remote', False) self.config.delete_value(self.path, self.index, remote) def default(self): return self.schema.get('value') class _Option: def list(self, type='option'): if type not in ['all', 'option']: raise NotImplementedError() for path, schema in self.schema['properties'].items(): if type == 'all' or schema['type'] not in ['object', 'array']: hidden = self.temp.get(path, {}).get('hidden', None) model_display = not self.model.get(path, {}).get('hidden', False) and \ self.model.get(path, {}).get('display', True) if hidden is False or (hidden is None and model_display is True): if schema['type'] in ['object', 'array']: yield TiramisuOptionDescription(self.config, schema, self.model, self.form, self.temp, path) else: yield TiramisuOption(self.config, schema, self.model, self.form, self.temp, path, self.index) class TiramisuOptionDescription(_Option): # config.option(path) (with path == OptionDescription) def __init__(self, config: 'Config', schema: Dict, model: List[Dict], form: List[Dict], temp: List[Dict], path: str) -> None: self.config = config self.schema = schema self.model = model self.form = form self.temp = temp self.path = path self.index = None def __getattr__(self, subfunc: str) -> Any: if subfunc == 'option': return TiramisuOptionOption(self.config, self.path, self.schema, self.model, self.form) if subfunc == 'property': return TiramisuOptionProperty(self.config, self.path, self.model.get(self.path, {})) if subfunc == 'value': return TiramisuOptionValue(self.config, self.schema, self.model, self.form, self.temp, self.path, self.index) raise APIError(_('please specify a valid sub function ({})').format(subfunc)) def group_type(self): hidden = self.temp.get(self.path, {}).get('hidden', None) model_display = not self.model.get(self.path, {}).get('hidden', False) and \ self.model.get(self.path, {}).get('display', True) if hidden is False or (hidden is None and model_display): # FIXME return 'default' raise PropertiesOptionError(None, None, None, opt_type='optiondescription') class TiramisuOption: # config.option(path) (with path == Option) def __init__(self, config: 'Config', schema: Dict, model: List[Dict], form: List[Dict], temp: List[Dict], path: str, index: Optional[int]) -> None: self.config = config self.schema = schema self.model = model self.form = form self.temp = temp self.path = path self.index = index def __getattr__(self, subfunc: str) -> Any: if subfunc == 'option': if self.index != None: raise NotImplementedError() return TiramisuOptionOption(self.config, self.path, self.schema, self.model, self.form) if subfunc == 'value': return TiramisuOptionValue(self.config, self.schema, self.model, self.form, self.temp, self.path, self.index) if subfunc == 'owner': return TiramisuOptionOwner(self.config, self.schema, self.model, self.form, self.temp, self.path, self.index) if subfunc == 'property': return TiramisuOptionProperty(self.config, self.path, self.index, self.model.get(self.path, {})) raise APIError(_('please specify a valid sub function ({})').format(subfunc)) class TiramisuContextProperty: # config.property # def __init__(self, # json): # self.json = json def get(self): # FIXME ? return ['demoting_error_warning'] class ContextOption(_Option): # config.option def __init__(self, config: 'Config', model: Dict, form: Dict, schema: Dict, temp: Dict) -> None: self.config = config self.model = model self.form = form self.schema = {'properties': schema} self.temp = temp self.index = None def __call__(self, path: str, index: Optional[int]=None) -> TiramisuOption: schema = self.config.get_schema(path) if schema['type'] in ['object', 'array']: return TiramisuOptionDescription(self.config, schema, self.model, self.form, self.temp, path) return TiramisuOption(self.config, schema, self.model, self.form, self.temp, path, index) class ContextValue(_Value): # config.value def __init__(self, config: 'Config', model: Dict, form: Dict, schema: Dict, temp: Dict) -> None: self.config = config self.model = model self.form = form first = next(iter(schema.keys())) self.path = first.rsplit('.', 1)[0] self.schema = {'properties': schema} self.temp = temp def __call__(self) -> TiramisuOptionValue: return TiramisuOptionValue(self.config, self.schema, self.model, self.form, self.temp, path, index) def mandatory(self): for key, value in self.dict().items(): if self.model.get(key, {}).get('required') and \ value is None or \ (self.schema.get('isMulti') and (None in value or '' in value)): yield key class Config: # config def __init__(self, json): self.model_ori = json['model'] self.gen_model(json['model']) self.form = {} for option in json['form']: if 'key' in option: if 'pattern' in option: option['pattern'] = re.compile(option['pattern']) self.form[option['key']] = option self.temp = {} self.schema = json['schema'] self.updates = [] first_path = next(iter(self.schema.keys())) if '.' in first_path: self.root = first_path.rsplit('.', 1)[0] else: self.root = '' def gen_model(self, model) -> List[Dict]: self.model = {} for option in model: self.update_model(option) def update_model(self, model): key = model['key'] if 'index' in model: if key not in self.model: self.model[key] = copy(model) self.model[key]['value'] = {} del self.model[key]['index'] del self.model[key]['owner'] if 'hidden' in self.model[key]: del self.model[key]['hidden'] if model.get('hidden') is True: self.model[key]['value'][model['index']] = () else: self.model[key]['value'][model['index']] = (model['value'], model['owner']) else: self.model[key] = model def __getattr__(self, subfunc: str) -> Any: if subfunc == 'property': return TiramisuContextProperty() if subfunc == 'option': return ContextOption(self, self.model, self.form, self.schema, self.temp) if subfunc == 'value': return ContextValue(self, self.model, self.form, self.schema, self.temp) raise APIError(_('please specify a valid sub function ({})').format(subfunc)) def add_value(self, path: str, index: Optional[int], value: Any, remote: bool) -> None: self.updates_value('add', path, index, value, remote, None) def modify_value(self, path: str, index: Optional[int], value: Any, remote: bool) -> None: schema = self.get_schema(path) if value and isinstance(value, list) and value[-1] is undefined: new_value = schema.get('defaultvalue') if new_value is None: len_value = len(value) schema_value = schema.get('value', []) if len(schema_value) >= len_value: new_value = schema_value[len_value - 1] value[-1] = new_value self.updates_value('modify', path, index, value, remote, None) def delete_value(self, path: str, index: Optional[int], remote: bool) -> None: self.updates_value('delete', path, index, None, remote, None) def get_properties(self, model, path, index, only_raises=True): props = model.get('properties', [])[:] if model.get('required'): if self.get_schema(path).get('isMulti', False): props.append('empty') else: props.append('mandatory') if model.get('needs_len'): props.append('mandatory') if model.get('readOnly'): props.append('frozen') if only_raises and self.get_hidden(path, index): props.append('hidden') if self.form.get(path, {}).get('clearable'): props.append('clearable') return props def get_schema(self, path): root_path = self.root schema = {'properties': self.schema, 'type': 'object'} if root_path: root = self.root.split('.') if not path.startswith(self.root): raise Exception('cannot find {0}'.format(path)) subpaths = path.split('.')[len(root):] else: subpaths = path.split('.') for subpath in subpaths: if root_path: root_path += '.' + subpath else: root_path = subpath schema = schema['properties'][root_path] return schema def isfollower(self, path: str) -> bool: if '.' in path: parent_schema = self.get_schema(path.rsplit('.', 1)[0]) leader = next(iter(parent_schema['properties'].keys())) if parent_schema['type'] == 'array' and \ leader != path: return True return False def get_hidden(self, path: str, index: Optional[int]) -> bool: property_ = 'hidden' if property_ in self.temp.get(path, {}): value = self.temp[path][property_] else: if index is None: value = self.model.get(path, {}).get(property_, False) else: value = self.model.get(path, {}).get(property_, False) or \ self.model.get(path, {}).get('value', {}).get(index) == () return value def get_value(self, path: str, index: int=None) -> Any: if index is None: if 'value' in self.temp.get(path, {}): value = self.temp[path]['value'] else: value = self.model.get(path, {}).get('value') if value is None and self.get_schema(path).get('isMulti', False): value = [] else: if index in self.temp.get(path, {}).get('value', {}): value = self.temp[path].get('value') else: value = self.model.get(path, {}).get('value') if value is not None: if index in value: if len(value[index]): value = value[index][0] else: value = PropertiesOptionError(None, None, None, opt_type='option') else: value = self.get_schema(path).get('default') else: value = None return value def get_owner(self, path: str, index: int) -> str: if not self.isfollower(path): if 'owner' in self.temp.get(path, {}): owner = self.temp[path]['owner'] else: owner = self.model.get(path, {}).get('owner', 'default') else: if 'value' in self.temp.get(path, {}): value = self.temp[path]['value'] else: value = self.model.get(path, {}).get('value', {}) if index in value: if not value[index]: raise PropertiesOptionError(None, None, None, opt_type='option') owner = value[index][1] else: owner = 'default' return owner def updates_value(self, action: str, path: str, index: Optional[int], value: Optional[Any], remote: bool, leadership: Optional[str]) -> None: update_last_action = False if self.updates: last_body = self.updates[-1] if last_body['name'] == path: if index is None and not 'index' in last_body: last_action = last_body['action'] if last_action == action or \ last_action in ['delete', 'modify'] and action in ['delete', 'modify']: update_last_action = True elif index == None and action == 'delete': for update in reversed(self.updates): if leadership is None and update['name'] == path or \ leadership and path.startswith(leadership + '.'): del self.updates[-1] else: break elif last_body['index'] == index: if last_body['action'] == 'add' and action == 'modify': action = 'add' update_last_action = True elif last_body['action'] == action and action != 'delete' or \ last_body['action'] == 'modify' and action == 'delete': update_last_action = True elif last_body['action'] == 'add' and action == 'delete': del self.updates[-1] if update_last_action: if action == 'delete' and value is None: if 'value' in last_body: del last_body['value'] else: last_body['value'] = value if index is None and 'index' in last_body: del last_body['index'] last_body['action'] = action else: data = {'action': action, 'name': path} if action != 'delete' and value is not None: data['value'] = value if index is not None: data['index'] = index self.updates.append(data) if 'pattern' in self.form.get(path, {}) and (not isinstance(value, list) or undefined not in value): match = self.test_value(path, value, remote) else: match = True if match: if remote: self.updates_data(self.send_data({'updates': self.updates, 'model': self.model_ori})) else: if action == 'delete': self.temp.setdefault(path, {})['owner'] = 'tmp' if index is None: value = self.default_value(path) self.temp[path]['value'] = value if self.option(path).option.isleader(): leadership_path = path.rsplit('.', 1)[0] parent_schema = self.get_schema(leadership_path) iter_leadership = list(parent_schema['properties'].keys()) for follower in iter_leadership[1:]: for idx in range(len(value)): follower_value = self.get_schema(follower).get('default') # FIXME PropertiesOptionError? self.temp.setdefault(follower, {}).setdefault('value', {})[idx] = (follower_value, 'default') elif self.option(path).option.isleader(): old_value = self.option(path).value.get() old_value.pop(index) self.temp[path]['value'] = old_value leadership_path = path.rsplit('.', 1)[0] parent_schema = self.get_schema(leadership_path) iter_leadership = list(parent_schema['properties'].keys()) for follower in iter_leadership[1:]: if index in self.temp.get(follower, {}).get('value', {}): del self.temp[follower]['value'][index] if index in self.model.get(follower, {}).get('value', {}): del self.model[follower]['value'][index] else: if index in self.temp.get(path, {}).get('value', {}): del self.temp[path]['value'][index] if index in self.model.get(path, {}).get('value', {}): del self.model[path]['value'][index] elif index is None: self.temp.setdefault(path, {})['owner'] = 'tmp' self.temp[path]['value'] = value else: self.temp.setdefault(path, {}).setdefault('value', {})[index] = (value, 'tmp') self.set_dependencies(path, value) self.set_not_equal(path, value) self.do_copy(path, value) def default_value(self, path): schema = self.get_schema(path) value = schema.get('value'); if value is None and schema.get('isMulti', False): value = [] return value def updates_data(self, data): self.updates = [] self.temp.clear() for key in data['updates']: if key in self.model: del self.model[key] for model in data['model']: if key == model['key']: self.update_model(model) self.model_ori = data['model'] def test_value(self, path: str, value: Any, remote: bool): if isinstance(value, list): for val in value: if not self.test_value(path, val, remote): return False return True else: if value is None: match = True else: match = self.form[path]['pattern'].search(value) if not remote: if not match: self.temp.setdefault(path, {})['error'] = [''] elif 'error' in self.model[path]: del self.temp[path]['error'] return match def set_dependencies(self, path: str, ori_value: Any, force_hide: bool=False) -> None: dependencies = self.form.get(path, {}).get('dependencies', {}) if dependencies: if ori_value in dependencies['expected']: expected = dependencies['expected'][ori_value] else: expected = dependencies['default'] for action in ['hide', 'show']: expected_actions = expected.get(action) if expected_actions: if force_hide: hidden = True else: hidden = action == 'hide' for expected_path in expected_actions: self.temp.setdefault(expected_path, {})['hidden'] = hidden if 'value' in self.temp[expected_path]: value = self.temp[expected_path]['value'] else: value = self.model[expected_path].get('value') self.set_dependencies(expected_path, value, hidden) def set_not_equal(self, path: str, value: Any) -> None: not_equal = self.form.get(path, {}).get('not_equal', {}) if not_equal: vals = [] opts = [] if isinstance(value, list): for val in value: vals.append(val) opts.append(path) else: vals.append(value) opts.append(path) for path_ in self.form[path]['not_equal']['options']: schema = self.get_schema(path_) p_value = self.get_value(path_) if isinstance(p_value, list): for val in p_value: vals.append(val) opts.append(path_) else: vals.append(p_value) opts.append(path_) equal = [] warnings_only = self.form[path]['not_equal'].get('warnings', False) if warnings_only: msg = _('should be different from the value of "{}"') msgcurr = _('value for {} should be different') else: msg = _('must be different from the value of "{}"') msgcurr = _('value for {} must be different') for idx_inf, val_inf in enumerate(vals): for idx_sup, val_sup in enumerate(vals[idx_inf + 1:]): if val_inf == val_sup is not None: for opt_ in [opts[idx_inf], opts[idx_inf + idx_sup + 1]]: if opt_ not in equal: equal.append(opt_) if equal: equal_name = {} for opt in equal: schema = self.get_schema(opt) equal_name[opt] = schema['title'] for opt_ in equal: display_equal = [] for opt__ in equal: if opt_ != opt__: display_equal.append(equal_name[opt_]) display_equal = ', '.join(display_equal) if opt_ == path: msg_ = msgcurr.format(display_equal) else: msg_ = msg.format(display_equal) if warnings_only: self.model[opt_].setdefault('warnings', []).append(msg_) else: self.model[opt_].setdefault('error', []).append(msg_) else: for opt in opts: if 'warnings' in self.model[opt]: del self.model[opt]['warnings'] if 'error' in self.model[opt]: del self.model[opt]['error'] def do_copy(self, path: str, value: Any) -> None: copy = self.form.get(path, {}).get('copy') if copy: for opt in copy: # FIXME follower! owner = self.get_owner(opt, None) if owner == 'default': # do not change in this.temp, it's default value self.model[opt]['value'] = value def send_data(self, updates): raise NotImplementedError('please implement send_data function')