from typing import Optional, Dict, List, Any from copy import copy import warnings import re from .error import APIError, ValueWarning, ValueOptionError, ValueErrorWarning, PropertiesOptionError from .setting import undefined from .i18n import _ TIRAMISU_JSON_VERSION = '1.0' TYPE = {'boolean': bool, 'integer': int, 'string': str, 'password': str, 'domain': str} class Option: # fake Option (IntOption, StrOption, ...) # only usefull for warnings def __init__(self, path): self.path = path def __call__(self): # suppose to be a weakref return self def impl_getpath(self): return self.path class TiramisuOptionOption: # config.option(path).option def __init__(self, config: 'Config', path: str, schema: Dict, model: Dict, form: Dict) -> None: self.config = config self._path = path self.schema = schema self.model = model self.form = form def doc(self): return self.schema['title'] def path(self): return self._path def name(self, follow_symlink: bool=False) -> str: if not follow_symlink or \ self.isoptiondescription() or \ not self.issymlinkoption(): path = self._path else: path = self.schema['opt_path'] return path.rsplit('.', 1)[-1] def isoptiondescription(self): return self.schema['type'] in ['object', 'array'] def isleadership(self): return self.schema['type'] == 'array' def isleader(self): if '.' in self._path: parent_schema = self.config.get_schema(self._path.rsplit('.', 1)[0]) if parent_schema['type'] == 'array': leader = next(iter(parent_schema['properties'].keys())) return leader == self._path return False def isfollower(self): return self.config.isfollower(self._path) def issymlinkoption(self) -> bool: return self.schema['type'] == 'symlink' def ismulti(self) -> bool: return self.schema.get('isMulti', False) def issubmulti(self) -> bool: return self.schema.get('isSubMulti', False) def type(self) -> str: if self.isleadership(): return 'leadership' if self.isoptiondescription(): return 'optiondescription' if self.issymlinkoption(): return self.config.get_schema(self.schema['opt_path'])['type'] return self.schema['type'] def properties(self) -> List[str]: model = self.model.get(self._path, {}) if self.isfollower(): model = model.get(None, {}) return self.config.get_properties(model, self._path, None) def requires(self) -> None: # FIXME return None class TiramisuOptionProperty: # config.option(path).property def __init__(self, config: 'Config', path: str, index: Optional[int], model: Dict) -> None: self.config = config self.path = path self.index = index self.model = model def get(self, only_raises=False): if not only_raises: props = self.config.get_properties(self.model, self.path, self.index, only_raises) else: props = [] if self.config.is_hidden(self.path, self.index): props.append('hidden') return props class _Value: def _dict_walk(self, ret: Dict, schema: Dict, root: str, fullpath: bool, withwarning: bool) -> None: leadership_len = None for key, option in schema['properties'].items(): if self.config.is_hidden(key, None) is False: if option['type'] in ['object', 'array']: # optiondescription or leadership self._dict_walk(ret, option, root, fullpath, withwarning) elif schema.get('type') == 'array' and leadership_len is not None: # followers values = [] for index in range(leadership_len): value = self.config.get_value(key, index) self._display_warnings(key, value, option['type'], option['name'], withwarning) values.append(value) ret[key] = values else: value = self.config.get_value(key) self._display_warnings(key, value, option['type'], option['name'], withwarning) ret[key] = value if schema.get('type') == 'array': leadership_len = len(value) elif schema.get('type') == 'array' and leadership_len is None: # if leader is hidden, followers are hidden too break def dict(self, fullpath: bool=False, withwarning: bool=False): ret = {} self._dict_walk(ret, self.schema, self.path, fullpath, withwarning) return ret def _display_warnings(self, path, value, type, name, withwarning=True): for err in self.model.get(path, {}).get('error', []): warnings.warn_explicit(ValueErrorWarning(value, type, Option(path), '{0}'.format(err)), ValueErrorWarning, self.__class__.__name__, 0) if withwarning and self.model.get(path, {}).get('warnings'): for warn in self.model.get(path, {}).get('warnings'): warnings.warn_explicit(ValueErrorWarning(value, type, Option(path), '{0}'.format(err)), ValueErrorWarning, self.__class__.__name__, 0) class TiramisuOptionOwner: # config.option(path).owner def __init__(self, config: 'Config', schema: Dict, model: List[Dict], form: List[Dict], temp: List[Dict], path: str, index: int) -> None: self.config = config self.schema = schema self.model = model self.form = form self.temp = temp self.path = path self.index = index def isdefault(self) -> Any: return self.config.get_owner(self.path, self.index) == 'default' def get(self) -> str: return self.config.get_owner(self.path, self.index) class TiramisuOptionValue(_Value): # config.option(path).value def __init__(self, config: 'Config', schema: Dict, model: List[Dict], form: List[Dict], temp: List[Dict], path: str, index: int) -> None: self.config = config self.schema = schema self.model = model self.form = form self.temp = temp self.path = path self.index = index def get(self) -> Any: if self.config.isfollower(self.path): if self.index is None: raise APIError(_('index must be set with the follower option "{}"').format(self.path)) value = self.config.get_value(self.path, self.index) self._display_warnings(self.path, value, self.schema['type'], self.schema['name']) return value if self.index is not None: raise APIError(_('index must only be set with a follower option, not for "{}"').format(self.path)) value = self.config.get_value(self.path) self._display_warnings(self.path, value, self.schema['type'], self.schema['name']) return value def list(self): return self.schema['enum'] def _validate(self, type_, value): if value in [None, undefined]: return if type_ == 'choice': if value not in self.schema['enum']: raise ValueError('value {} is not in {}'.format(value, self.schema['enum'])) elif not isinstance(value, TYPE[type_]): raise ValueError('value {} is not a valid {} '.format(value, type_)) def set(self, value): type_ = self.schema['type'] remote = self.form.get(self.path, {}).get('remote', False) if self.index is None and self.schema.get('isMulti', False): if not isinstance(value, list): raise Exception('value must be a list') for val in value: if self.schema.get('isSubMulti', False): for v in val: self._validate(type_, v) else: self._validate(type_, val) else: if self.schema.get('isSubMulti', False): for val in value: self._validate(type_, val) else: self._validate(type_, value) self.config.modify_value(self.path, self.index, value, remote) self._display_warnings(self.path, value, type_, self.schema['name']) def reset(self): remote = self.form.get(self.path, {}).get('remote', False) self.config.delete_value(self.path, self.index, remote) def default(self): return self.schema.get('value') class _Option: def list(self, type='option'): if type not in ['all', 'option', 'optiondescription']: raise Exception('unknown list type {}'.format(type)) for path, schema in self.schema['properties'].items(): if not self.config.is_hidden(path, None): if schema['type'] in ['object', 'array']: if type in ['all', 'optiondescription']: yield TiramisuOptionDescription(self.config, schema, self.model, self.form, self.temp, path) elif type in ['all', 'option']: yield TiramisuOption(self.config, schema, self.model, self.form, self.temp, path, self.index) class TiramisuOptionDescription(_Option): # config.option(path) (with path == OptionDescription) def __init__(self, config: 'Config', schema: Dict, model: List[Dict], form: List[Dict], temp: List[Dict], path: str) -> None: self.config = config self.schema = schema self.model = model self.form = form self.temp = temp self.path = path self.index = None def __getattr__(self, subfunc: str) -> Any: if subfunc == 'option': return TiramisuOptionOption(self.config, self.path, self.schema, self.model, self.form) if subfunc == 'property': return TiramisuOptionProperty(self.config, self.path, self.model.get(self.path, {})) if subfunc == 'value': return TiramisuOptionValue(self.config, self.schema, self.model, self.form, self.temp, self.path, self.index) raise APIError(_('please specify a valid sub function ({})').format(subfunc)) def group_type(self): if self.config.is_hidden(self.path, None): # FIXME return 'default' raise PropertiesOptionError(None, None, None, opt_type='optiondescription') class TiramisuOption: # config.option(path) (with path == Option) def __init__(self, config: 'Config', schema: Dict, model: List[Dict], form: List[Dict], temp: List[Dict], path: str, index: Optional[int]) -> None: self.config = config self.schema = schema self.model = model self.form = form self.temp = temp self.path = path self.index = index def __getattr__(self, subfunc: str) -> Any: if subfunc == 'option': if self.index != None: raise NotImplementedError() return TiramisuOptionOption(self.config, self.path, self.schema, self.model, self.form) if subfunc == 'value': return TiramisuOptionValue(self.config, self.schema, self.model, self.form, self.temp, self.path, self.index) if subfunc == 'owner': return TiramisuOptionOwner(self.config, self.schema, self.model, self.form, self.temp, self.path, self.index) if subfunc == 'property': return TiramisuOptionProperty(self.config, self.path, self.index, self.model.get(self.path, {})) raise APIError(_('please specify a valid sub function ({})').format(subfunc)) class TiramisuContextProperty: # config.property # def __init__(self, # json): # self.json = json def get(self): # FIXME ? return ['demoting_error_warning'] class ContextOption(_Option): # config.option def __init__(self, config: 'Config', model: Dict, form: Dict, schema: Dict, temp: Dict) -> None: self.config = config self.model = model self.form = form self.schema = {'properties': schema} self.temp = temp self.index = None def __call__(self, path: str, index: Optional[int]=None) -> TiramisuOption: schema = self.config.get_schema(path) if schema['type'] in ['object', 'array']: return TiramisuOptionDescription(self.config, schema, self.model, self.form, self.temp, path) return TiramisuOption(self.config, schema, self.model, self.form, self.temp, path, index) class ContextValue(_Value): # config.value def __init__(self, config: 'Config', model: Dict, form: Dict, schema: Dict, temp: Dict) -> None: self.config = config self.model = model self.form = form first = next(iter(schema.keys())) self.path = first.rsplit('.', 1)[0] self.schema = {'properties': schema} self.temp = temp def __call__(self) -> TiramisuOptionValue: return TiramisuOptionValue(self.config, self.schema, self.model, self.form, self.temp, path, index) def mandatory(self): for key, value in self.dict().items(): if self.config.isfollower(key): # FIXME test with index if self.model.get(key, {}).get(None, {}).get('required'): if self.config.get_schema(key).get('isSubMulti'): for val in value: if not val or None in val or '' in val: yield key break elif None in value or '' in value: yield key elif value is None or \ (self.config.get_schema(key).get('isMulti') and (not value or None in value or '' in value)): yield key class Config: # config def __init__(self, json): if json.get('version') != TIRAMISU_JSON_VERSION: raise Exception('incompatible tiramisu-json format version (got {}, expected {})'.format(json.get('version', '0.0'), TIRAMISU_JSON_VERSION)) self.model = json['model'] self.form = json['form'] # support pattern for key, option in json['form'].items(): if key != 'null': if 'pattern' in option: option['pattern'] = re.compile(option['pattern']) self.temp = {} self.schema = json['schema'] self.updates = [] first_path = next(iter(self.schema.keys())) if '.' in first_path: self.root = first_path.rsplit('.', 1)[0] else: self.root = '' def __getattr__(self, subfunc: str) -> Any: if subfunc == 'property': return TiramisuContextProperty() if subfunc == 'option': return ContextOption(self, self.model, self.form, self.schema, self.temp) if subfunc == 'value': return ContextValue(self, self.model, self.form, self.schema, self.temp) raise APIError(_('please specify a valid sub function ({})').format(subfunc)) def add_value(self, path: str, index: Optional[int], value: Any, remote: bool) -> None: self.updates_value('add', path, index, value, remote, None) def modify_value(self, path: str, index: Optional[int], value: Any, remote: bool) -> None: schema = self.get_schema(path) if value and isinstance(value, list) and value[-1] is undefined: new_value = schema.get('defaultvalue') if new_value is None: len_value = len(value) schema_value = schema.get('value', []) if len(schema_value) >= len_value: new_value = schema_value[len_value - 1] value[-1] = new_value self.updates_value('modify', path, index, value, remote, None) def delete_value(self, path: str, index: Optional[int], remote: bool) -> None: self.updates_value('delete', path, index, None, remote, None) def get_properties(self, model, path, index, only_raises=True): props = model.get('properties', [])[:] if model.get('required'): if self.get_schema(path).get('isMulti', False) and not self.isfollower(path): props.append('empty') else: props.append('mandatory') if model.get('needs_len'): props.append('mandatory') if model.get('readOnly'): props.append('frozen') if only_raises and self.is_hidden(path, index): props.append('hidden') if self.form.get(path, {}).get('clearable'): props.append('clearable') return props def get_schema(self, path): root_path = self.root schema = {'properties': self.schema, 'type': 'object'} if root_path: root = self.root.split('.') if not path.startswith(self.root): raise Exception('cannot find {0}'.format(path)) subpaths = path.split('.')[len(root):] else: subpaths = path.split('.') for subpath in subpaths: if root_path: root_path += '.' + subpath else: root_path = subpath schema = schema['properties'][root_path] return schema def isfollower(self, path: str) -> bool: if '.' in path: parent_schema = self.get_schema(path.rsplit('.', 1)[0]) leader = next(iter(parent_schema['properties'].keys())) if parent_schema['type'] == 'array' and \ leader != path: return True return False def is_hidden(self, path: str, index: Optional[int]) -> bool: for property_, needs in {'hidden': True, 'display': False}.items(): if property_ in self.temp.get(path, {}): return self.temp[path][property_] else: if self.isfollower(path): if self.model.get(path, {}).get('null', {}).get(property_, None) == needs: return True elif self.model.get(path, {}).get(property_, None) == needs: return True index = str(index) if index != 'None' and index in self.model.get(path, {}) and self.model.get(path, {}).get(index, {}).get(property_, None) == needs: return True return False def get_value(self, path: str, index: int=None) -> Any: schema = self.get_schema(path) if schema['type'] == 'symlink': path = schema['opt_path'] schema = self.get_schema(path) if index is None: if 'value' in self.temp.get(path, {}): value = self.temp[path]['value'] else: value = self.model.get(path, {}).get('value') if value is None and schema.get('isMulti', False): value = [] else: index = str(index) if 'delete' in self.temp.get(path, {}): value = None elif index in self.temp.get(path, {}) and 'delete' in self.temp[path][index]: value = None elif 'value' in self.temp.get(path, {}).get(index, {}): value = self.temp[path] else: value = self.model.get(path) if self.isfollower(path): if self.is_hidden(path, index): value = PropertiesOptionError(None, None, None, opt_type='option') elif value is not None and 'value' in value.get(index, {}): value = value[index]['value'] else: value = schema.get('default') else: if value is not None and index in value and 'value' in value[index]: value = value[index]['value'] else: value = schema.get('default') if value is None and schema.get('isSubMulti', False): value = [] return value def get_owner(self, path: str, index: int) -> str: if not self.isfollower(path): if 'owner' in self.temp.get(path, {}): owner = self.temp[path]['owner'] else: owner = self.model.get(path, {}).get('owner', 'default') else: if 'value' in self.temp.get(path, {}): value = self.temp[path] else: value = self.model.get(path, {}) index = str(index) if self.is_hidden(path, index): raise PropertiesOptionError(None, None, None, opt_type='option') if index in value: owner = value[index]['owner'] else: owner = 'default' return owner def updates_value(self, action: str, path: str, index: Optional[int], value: Optional[Any], remote: bool, leadership: Optional[str]) -> None: update_last_action = False if self.updates: last_body = self.updates[-1] if last_body['name'] == path: if index is None and not 'index' in last_body: last_action = last_body['action'] if last_action == action or \ last_action in ['delete', 'modify'] and action in ['delete', 'modify']: update_last_action = True elif index == None and action == 'delete': for update in reversed(self.updates): if leadership is None and update['name'] == path or \ leadership and path.startswith(leadership + '.'): del self.updates[-1] else: break elif last_body['index'] == index: if last_body['action'] == 'add' and action == 'modify': action = 'add' update_last_action = True elif last_body['action'] == action and action != 'delete' or \ last_body['action'] == 'modify' and action == 'delete': update_last_action = True elif last_body['action'] == 'add' and action == 'delete': del self.updates[-1] if update_last_action: if action == 'delete' and value is None: if 'value' in last_body: del last_body['value'] else: last_body['value'] = value if index is None and 'index' in last_body: del last_body['index'] last_body['action'] = action else: data = {'action': action, 'name': path} if action != 'delete' and value is not None: data['value'] = value if index is not None: data['index'] = index self.updates.append(data) if 'pattern' in self.form.get(path, {}) and (not isinstance(value, list) or undefined not in value): match = self.test_value(path, value, remote) else: match = True if match: if remote: self.updates_data(self.send_data({'updates': self.updates, 'model': self.model})) else: if action == 'delete': if index is None: # leader or standard option # set value to default value value = self.default_value(path) self.temp[path] = {'value': value, 'owner': 'default'} if self.option(path).option.isleader(): # if leader, set follower to default value leadership_path = path.rsplit('.', 1)[0] parent_schema = self.get_schema(leadership_path) iter_leadership = list(parent_schema['properties'].keys()) for follower in iter_leadership[1:]: # delete all values self.temp[follower] = {'delete': True} elif self.option(path).option.isleader(): # if remove an indexed leader value old_value = self.option(path).value.get() old_value.pop(index) self.temp[path] = {'value': old_value, 'owner': 'tmp'} leadership_path = path.rsplit('.', 1)[0] parent_schema = self.get_schema(leadership_path) iter_leadership = list(parent_schema['properties'].keys()) for follower in iter_leadership[1:]: # remove value for this index and reduce len #FIXME on ne reduce pas la longueur !!!! self.temp.setdefault(follower, {})[str(index)] = {'delete': True} else: # it's a follower with index self.temp.setdefault(path, {})[str(index)] = {'delete': True} elif index is None: # set a value for a not follower option self.temp[path] = {'value': value, 'owner': 'tmp'} else: # set a value for a follower option self.temp.setdefault(path, {})[str(index)] = {'value': value, 'owner': 'tmp'} self.set_dependencies(path, value) self.set_not_equal(path, value) self.do_copy(path, value) def default_value(self, path): schema = self.get_schema(path) value = schema.get('value'); if value is None and schema.get('isMulti', False): value = [] return value def updates_data(self, data): self.updates = [] self.temp.clear() self.model = data['model'] def test_value(self, path: str, value: Any, remote: bool): if isinstance(value, list): for val in value: if not self.test_value(path, val, remote): return False return True else: if value is None: match = True else: match = self.form[path]['pattern'].search(value) if not remote: if not match: self.temp.setdefault(path, {})['error'] = [''] elif 'error' in self.model[path]: del self.temp[path]['error'] return match def set_dependencies(self, path: str, ori_value: Any, force_hide: bool=False) -> None: dependencies = self.form.get(path, {}).get('dependencies', {}) if dependencies: if ori_value in dependencies['expected']: expected = dependencies['expected'][ori_value] else: expected = dependencies['default'] for action in ['hide', 'show']: expected_actions = expected.get(action) if expected_actions: if force_hide: hidden = True else: hidden = action == 'hide' for expected_path in expected_actions: self.temp.setdefault(expected_path, {})['hidden'] = hidden if 'value' in self.temp[expected_path]: value = self.temp[expected_path]['value'] else: value = self.model[expected_path].get('value') self.set_dependencies(expected_path, value, hidden) def set_not_equal(self, path: str, value: Any) -> None: not_equal = self.form.get(path, {}).get('not_equal', {}) if not_equal: vals = [] opts = [] if isinstance(value, list): for val in value: vals.append(val) opts.append(path) else: vals.append(value) opts.append(path) for path_ in self.form[path]['not_equal']['options']: schema = self.get_schema(path_) p_value = self.get_value(path_) if isinstance(p_value, list): for val in p_value: vals.append(val) opts.append(path_) else: vals.append(p_value) opts.append(path_) equal = [] warnings_only = self.form[path]['not_equal'].get('warnings', False) if warnings_only: msg = _('should be different from the value of "{}"') msgcurr = _('value for {} should be different') else: msg = _('must be different from the value of "{}"') msgcurr = _('value for {} must be different') for idx_inf, val_inf in enumerate(vals): for idx_sup, val_sup in enumerate(vals[idx_inf + 1:]): if val_inf == val_sup is not None: for opt_ in [opts[idx_inf], opts[idx_inf + idx_sup + 1]]: if opt_ not in equal: equal.append(opt_) if equal: equal_name = {} for opt in equal: schema = self.get_schema(opt) equal_name[opt] = schema['title'] for opt_ in equal: display_equal = [] for opt__ in equal: if opt_ != opt__: display_equal.append(equal_name[opt_]) display_equal = ', '.join(display_equal) if opt_ == path: msg_ = msgcurr.format(display_equal) else: msg_ = msg.format(display_equal) if warnings_only: self.model[opt_].setdefault('warnings', []).append(msg_) else: self.model[opt_].setdefault('error', []).append(msg_) else: for opt in opts: if 'warnings' in self.model[opt]: del self.model[opt]['warnings'] if 'error' in self.model[opt]: del self.model[opt]['error'] def do_copy(self, path: str, value: Any) -> None: copy = self.form.get(path, {}).get('copy') if copy: for opt in copy: # FIXME follower! owner = self.get_owner(opt, None) if owner == 'default': # do not change in this.temp, it's default value self.model[opt]['value'] = value def send_data(self, updates): raise NotImplementedError('please implement send_data function')