tiramisu-api-python/tiramisu_json_api/api.py

809 lines
29 KiB
Python

from typing import Optional, Dict, List, Any
import warnings
import re
from .error import APIError, ValueWarning, ValueOptionError, ValueErrorWarning, PropertiesOptionError
from .setting import undefined
from .i18n import _
TYPE = {'boolean': bool,
'number': int,
'string': str,
'password': str,
'domain': str}
class Option:
# fake Option (IntOption, StrOption, ...)
# only usefull for warnings
def __init__(self,
path):
self.path = path
def __call__(self):
# suppose to be a weakref
return self
def impl_getpath(self):
return self.path
class TiramisuOptionOption:
# config.option(path).option
def __init__(self,
path: str,
schema: Dict,
model: Dict,
form: Dict) -> None:
self._path = path
self.schema = schema
self.model = model
self.form = form
def doc(self):
return self.schema['title']
def path(self):
return self._path
def name(self,
follow_symlink: bool=False) -> str:
if not follow_symlink or \
self.isoptiondescription() or \
not self.issymlinkoption():
path = self._path
else:
path = self.schema['opt_path']
return path.rsplit('.', 1)[-1]
def isoptiondescription(self):
return self.schema['type'] in ['object', 'array']
def ismasterslaves(self):
return self.schema['type'] == 'array'
def issymlinkoption(self) -> bool:
return self.schema['type'] == 'symlink'
def ismulti(self) -> bool:
return self.schema.get('isMulti', False)
def type(self) -> str:
if self.ismasterslaves():
return 'masterslaves'
if self.isoptiondescription():
return 'optiondescription'
types = {'number': 'int',
'choice': 'choice',
'boolean': 'bool',
'password': 'password',
'date': 'date',
'domain': 'domainname',
'url': 'url',
'username': 'username',
'string': 'str',
'symlink': 'symlink'}
if self.schema['type'] in types :
return types[self.schema['type']]
raise Exception('unsupported type {}'.format(self.schema['type']))
def properties(self) -> List[str]:
props = []
model = self.model.get(self._path, {})
if model.get('required'):
#FIXME 'empty', 'needs_len'
props.append('mandatory')
if model.get('readOnly'):
props.append('frozen')
if self.config.get_hidden(this._path):
props.append('hidden')
if self.form.get(self._path, {}).get('clearable'):
props.append('clearable')
return props
def requires(self) -> None:
# FIXME
return None
class TiramisuOptionProperty:
# config.option(path).property
def __init__(self,
path: str,
model: Dict) -> None:
self.path = path
self.model = model
def get(self, only_raises=False):
# FIXME if slave:
#if not isslave and childapi.option.ismulti():
# if 'empty' in props:
# obj['required'] = True
# props.remove('empty')
# if 'mandatory' in props:
# obj['needs_len'] = True
# props.remove('mandatory')
if not only_raises:
properties = self.model.get('properties', [])[:]
if self.model.get('required', False):
properties.append('mandatory')
if self.model.get('readOnly', False):
properties.append('frozen')
else:
properties = []
if self.config.get_hidden(self.path):
properties.append('hidden')
return properties
class _Value:
def _dict_walk(self,
ret: Dict,
schema: Dict,
root: str,
fullpath: bool,
withwarning: bool):
for key, option in schema['properties'].items():
hidden = self.temp.get(key, {}).get('hidden', None)
model_hidden = not self.model.get(key, {}).get('hidden', False) and \
self.model.get(key, {}).get('display', True)
if hidden is False or (hidden is None and model_hidden):
if option['type'] == 'object':
self._dict_walk(ret,
option,
root,
fullpath,
withwarning)
else:
value = self.config.get_value(key, schema)
self._display_warnings(key, value, option['type'], option['name'], withwarning)
ret[key] = value
def dict(self,
fullpath: bool=False,
withwarning: bool=False):
ret = {}
self._dict_walk(ret,
self.schema,
self.path,
fullpath,
withwarning)
return ret
def _display_warnings(self, path, value, type, name, withwarning=True):
for err in self.model.get(path, {}).get('error', []):
warnings.warn_explicit(ValueOptionError(value,
type,
Option(path),
'{0}'.format(err)),
ValueErrorWarning,
self.__class__.__name__, 0)
if withwarning and self.model.get(path, {}).get('warnings'):
for warn in self.model.get(path, {}).get('warnings'):
# FIXME ValueWarning needs value and type attribute!
warnings.warn_explicit(ValueWarning('{0}'.format(warn),
Option(path)),
ValueWarning,
self.__class__.__name__, 0)
class TiramisuOptionOwner:
# config.option(path).owner
def __init__(self,
config: 'Config',
schema: Dict,
model: List[Dict],
form: List[Dict],
temp: List[Dict],
path: str,
index: int) -> None:
self.config = config
self.schema = schema
self.model = model
self.form = form
self.temp = temp
self.path = path
self.index = index
def isdefault(self) -> Any:
return self.config.get_owner(self.path) == 'default'
class TiramisuOptionValue(_Value):
# config.option(path).value
def __init__(self,
config: 'Config',
schema: Dict,
model: List[Dict],
form: List[Dict],
temp: List[Dict],
path: str,
index: int) -> None:
self.config = config
self.schema = schema
self.model = model
self.form = form
self.temp = temp
self.path = path
self.index = index
def get(self) -> Any:
value = self.config.get_value(self.path, self.schema)
self._display_warnings(self.path, value, self.schema['type'], self.schema['name'])
return value
def list(self):
return self.schema['enum']
def _validate(self, type_, value):
if value in [None, undefined]:
return
if type_ == 'choice':
if value not in self.schema['enum']:
raise Exception('value {} is not in {}'.format(value, self.schema['enum']))
elif not isinstance(value, TYPE[type_]):
raise Exception('value {} is not a valid {} '.format(value, type_))
def set(self, value):
type_ = self.schema['type']
if self.schema.get('isMulti', False):
if not isinstance(value, list):
raise Exception('value must be a list')
for val in value:
self._validate(type_, val)
else:
self._validate(type_, value)
remote = self.form.get(self.path, {}).get('remote', False)
self.config.modify_value(self.path,
self.index,
value,
remote)
self._display_warnings(self.path, value, type_, self.schema['name'])
def reset(self):
remote = self.form.get(self.path, {}).get('remote', False)
self.config.delete_value(self.path,
self.index,
remote)
class _Option:
def list(self,
type='option'):
if type not in ['all', 'option']:
raise NotImplementedError()
for path, schema in self.schema['properties'].items():
if type == 'all' or schema['type'] not in ['object', 'array']:
hidden = self.temp.get(path, {}).get('hidden', None)
model_hidden = not self.model.get(path, {}).get('hidden', False) and \
self.model.get(path, {}).get('display', True)
if hidden is False or (hidden is None and model_hidden is True):
if schema['type'] in ['object', 'array']:
yield TiramisuOptionDescription(self.config,
schema,
self.model,
self.form,
self.temp,
path)
else:
yield TiramisuOption(self.config,
schema,
self.model,
self.form,
self.temp,
path,
self.index)
class TiramisuOptionDescription(_Option):
# config.option(path) (with path == OptionDescription)
def __init__(self,
config: 'Config',
schema: Dict,
model: List[Dict],
form: List[Dict],
temp: List[Dict],
path: str) -> None:
self.config = config
self.schema = schema
self.model = model
self.form = form
self.temp = temp
self.path = path
self.index = None
def __getattr__(self,
subfunc: str) -> Any:
if subfunc == 'option':
return TiramisuOptionOption(self.path,
self.schema,
self.model,
self.form)
if subfunc == 'property':
return TiramisuOptionProperty(self.path,
self.model.get(self.path, {}))
if subfunc == 'value':
return TiramisuOptionValue(self.config,
self.schema,
self.model,
self.form,
self.temp,
self.path,
self.index)
raise APIError(_('please specify a valid sub function ({})').format(subfunc))
def group_type(self):
hidden = self.temp.get(self.path, {}).get('hidden', None)
model_hidden = not self.model.get(self.path, {}).get('hidden', False) and \
self.model.get(self.path, {}).get('display', True)
if hidden is False or (hidden is None and model_hidden):
# FIXME
return 'default'
raise PropertiesOptionError(None, None, None, opt_type='optiondescription')
class TiramisuOption:
# config.option(path) (with path == Option)
def __init__(self,
config: 'Config',
schema: Dict,
model: List[Dict],
form: List[Dict],
temp: List[Dict],
path: str,
index: Optional[int]) -> None:
self.config = config
self.schema = schema
self.model = model
self.form = form
self.temp = temp
self.path = path
self.index = index
def __getattr__(self,
subfunc: str) -> Any:
if subfunc == 'option':
if self.index != None:
raise NotImplementedError()
return TiramisuOptionOption(self.path,
self.schema,
self.model,
self.form)
if subfunc == 'value':
return TiramisuOptionValue(self.config,
self.schema,
self.model,
self.form,
self.temp,
self.path,
self.index)
if subfunc == 'owner':
return TiramisuOptionOwner(self.config,
self.schema,
self.model,
self.form,
self.temp,
self.path,
self.index)
if subfunc == 'property':
if self.index != None:
raise NotImplementedError()
return TiramisuOptionProperty(self.path,
self.model.get(self.path, {}))
raise APIError(_('please specify a valid sub function ({})').format(subfunc))
class TiramisuContextProperty:
# config.property
# def __init__(self,
# json):
# self.json = json
def get(self):
# FIXME ?
return ['demoting_error_warning']
class ContextOption(_Option):
# config.option
def __init__(self,
config: 'Config',
model: Dict,
form: Dict,
schema: Dict,
temp: Dict) -> None:
self.config = config
self.model = model
self.form = form
self.schema = {'properties': schema}
self.temp = temp
self.index = None
def __call__(self,
path: str,
index: Optional[int]=None) -> TiramisuOption:
schema = self.config.get_schema(path)
if schema['type'] in ['object', 'array']:
return TiramisuOptionDescription(self.config,
schema,
self.model,
self.form,
self.temp,
path)
return TiramisuOption(self.config,
schema,
self.model,
self.form,
self.temp,
path,
index)
class ContextValue(_Value):
# config.value
def __init__(self,
config: 'Config',
model: Dict,
form: Dict,
schema: Dict,
temp: Dict) -> None:
self.config = config
self.model = model
self.form = form
first = next(iter(schema.keys()))
self.path = first.rsplit('.', 1)[0]
self.schema = {'properties': schema}
self.temp = temp
def __call__(self) -> TiramisuOptionValue:
return TiramisuOptionValue(self.config,
self.schema,
self.model,
self.form,
self.temp,
path,
index)
def mandatory(self):
for key, value in self.dict().items():
if self.model.get(key, {}).get('required') and \
value is None or \
(self.schema.get('isMulti') and (None in value or '' in value)):
yield key
class Config:
# config
def __init__(self,
json):
self.model_ori = json['model']
self.model = {option['key']: option for option in json['model']}
self.form = {}
for option in json['form']:
if 'key' in option:
if 'pattern' in option:
option['pattern'] = re.compile(option['pattern'])
self.form[option['key']] = option
self.temp = {}
self.schema = json['schema']
self.updates = []
first_path = next(iter(self.schema.keys()))
if '.' in first_path:
self.root = first_path.rsplit('.', 1)[0]
else:
self.root = ''
def __getattr__(self,
subfunc: str) -> Any:
if subfunc == 'property':
return TiramisuContextProperty()
if subfunc == 'option':
return ContextOption(self,
self.model,
self.form,
self.schema,
self.temp)
if subfunc == 'value':
return ContextValue(self,
self.model,
self.form,
self.schema,
self.temp)
raise APIError(_('please specify a valid sub function ({})').format(subfunc))
def add_value(self,
path: str,
index: Optional[int],
value: Any,
remote: bool) -> None:
self.updates_value('add',
path,
index,
value,
remote,
None)
def modify_value(self,
path: str,
index: Optional[int],
value: Any,
remote: bool) -> None:
schema = self.get_schema(path)
if value and isinstance(value, list) and value[-1] is undefined:
new_value = schema.get('defaultvalue')
if new_value is None:
len_value = len(value)
schema_value = schema.get('value', [])
if len(schema_value) >= len_value:
new_value = schema_value[len_value - 1]
value[-1] = new_value
self.updates_value('modify',
path,
index,
value,
remote,
None)
def delete_value(self,
path: str,
index: Optional[int],
remote: bool) -> None:
schema = self.get_schema(path)
value = schema.get('value');
if value is None and schema.get('isMulti', False):
value = []
self.updates_value('delete',
path,
index,
value,
remote,
None)
def get_schema(self,
path):
root_path = self.root
schema = {'properties': self.schema,
'type': 'object'}
if root_path:
root = self.root.split('.')
if not path.startswith(root):
raise Exception('cannot find {0}'.format(path))
subpaths = path.split('.')[len(root):]
else:
subpaths = path.split('.')
for subpath in subpaths:
if root_path:
root_path += '.' + subpath
else:
root_path = subpath
schema = schema['properties'][root_path]
return schema
def get_hidden(self,
path: str) -> bool:
property_ = 'hidden'
if property_ in self.temp.get(path, {}):
value = self.temp[path][property_]
else:
value = self.model.get(path, {}).get(property_)
return value
def get_value(self,
path: str,
schema: Dict) -> Any:
if 'value' in self.temp.get(path, {}):
value = self.temp[path]['value']
else:
value = self.model.get(path, {}).get('value')
if value is None and schema.get('isMulti', False):
value = []
return value
def get_owner(self,
path: str) -> str:
if 'owner' in self.temp.get(path, {}):
owner = self.temp[path]['owner']
else:
owner = self.model.get(path, {}).get('owner', 'default')
return owner
def updates_value(self,
action: str,
path: str,
index: Optional[int],
value: Optional[Any],
remote: bool,
masterslaves: Optional[str]) -> None:
update_last_action = False
if self.updates:
last_body = self.updates[-1]
if last_body['name'] == path:
if index is None and not 'index' in last_body:
last_action = last_body['action']
if last_action == action or \
last_action in ['delete', 'modify'] and action in ['delete', 'modify']:
update_last_action = True
elif index == None and action == 'delete':
for update in reversed(self.updates):
if masterslaves == None and update['name'] == path or \
masterslaves and path.startswith(masterslaves + '.'):
del self.updates[-1]
else:
break
elif last_body['index'] == index:
if last_body['action'] == 'add' and action == 'modify':
action = 'add'
update_last_action = True
elif last_body['action'] == action and action != 'delete' or \
last_body['action'] == 'modify' and action == 'delete':
update_last_action = True
elif last_body['action'] == 'add' and action == 'delete':
del self.updates[-1]
if update_last_action:
if value is None:
if 'value' in last_body:
del last_body['value']
else:
last_body['value'] = value
if index is None and 'index' in last_body:
del last_body['index']
last_body['action'] = action
else:
data = {'action': action,
'name': path}
if value is not None:
data['value'] = value
if index is not None:
data['index'] = index
self.updates.append(data)
if 'pattern' in self.form.get(path, {}) and (not isinstance(value, list) or undefined not in value):
match = self.test_value(path, value, remote)
else:
match = True
if match:
if remote:
ret = self.send_data({'updates': self.updates,
'model': self.model_ori})
self.updates = []
self.temp.clear()
# FIXME remove old key ?
for model in ret['model']:
self.model[model['key']] = model
self.model_ori = ret['model']
else:
self.temp.setdefault(path, {})['owner'] = 'tmp'
self.temp[path]['value'] = value
self.set_dependencies(path, value)
self.set_not_equal(path, value)
self.do_copy(path, value)
def test_value(self,
path: str,
value: Any,
remote: bool):
if isinstance(value, list):
for val in value:
if not self.test_value(path, val, remote):
return False
return True
else:
if value is None:
match = True
else:
match = self.form[path]['pattern'].search(value)
if not remote:
if not match:
self.temp.setdefault(path, {})['error'] = ['']
elif 'error' in self.model[path]:
del self.temp[path]['error']
return match
def set_dependencies(self,
path: str,
ori_value: Any,
force_hide: bool=False) -> None:
dependencies = self.form.get(path, {}).get('dependencies', {})
if dependencies:
if ori_value in dependencies['expected']:
expected = dependencies['expected'][ori_value]
else:
expected = dependencies['default']
for action in ['hide', 'show']:
expected_actions = expected.get(action)
if expected_actions:
if force_hide:
hidden = True
else:
hidden = action == 'hide'
for expected_path in expected_actions:
self.temp.setdefault(expected_path, {})['hidden'] = hidden
if 'value' in self.temp[expected_path]:
value = self.temp[expected_path]['value']
else:
value = self.model[expected_path].get('value')
self.set_dependencies(expected_path, value, hidden)
def set_not_equal(self,
path: str,
value: Any) -> None:
not_equal = self.form.get(path, {}).get('not_equal', {})
if not_equal:
vals = []
opts = []
if isinstance(value, list):
for val in value:
vals.append(val)
opts.append(path)
else:
vals.append(value)
opts.append(path)
for path_ in self.form[path]['not_equal']['options']:
schema = self.get_schema(path_)
p_value = self.get_value(path_, schema)
if isinstance(p_value, list):
for val in p_value:
vals.append(val)
opts.append(path_)
else:
vals.append(p_value)
opts.append(path_)
equal = []
warnings_only = self.form[path]['not_equal'].get('warnings', False)
if warnings_only:
msg = _('should be different from the value of "{}"')
msgcurr = _('value for {} should be different')
else:
msg = _('must be different from the value of "{}"')
msgcurr = _('value for {} must be different')
for idx_inf, val_inf in enumerate(vals):
for idx_sup, val_sup in enumerate(vals[idx_inf + 1:]):
if val_inf == val_sup is not None:
for opt_ in [opts[idx_inf], opts[idx_inf + idx_sup + 1]]:
if opt_ not in equal:
equal.append(opt_)
if equal:
equal_name = {}
for opt in equal:
schema = self.get_schema(opt)
equal_name[opt] = schema['title']
for opt_ in equal:
display_equal = []
for opt__ in equal:
if opt_ != opt__:
display_equal.append(equal_name[opt_])
display_equal = ', '.join(display_equal)
if opt_ == path:
msg_ = msgcurr.format(display_equal)
else:
msg_ = msg.format(display_equal)
if warnings_only:
self.model[opt_].setdefault('warnings', []).append(msg_)
else:
self.model[opt_].setdefault('error', []).append(msg_)
else:
for opt in opts:
if 'warnings' in self.model[opt]:
del self.model[opt]['warnings']
if 'error' in self.model[opt]:
del self.model[opt]['error']
def do_copy(self,
path: str,
value: Any) -> None:
copy = self.form.get(path, {}).get('copy')
if copy:
for opt in copy:
owner = self.get_owner(opt)
if owner == 'default':
# do not change in this.temp, it's default value
self.model[opt]['value'] = value
def send_data(self,
updates):
raise NotImplementedError('please implement send_data function')