tiramisu-api-python/tiramisu_json_api/api.py

863 lines
31 KiB
Python

from typing import Optional, Dict, List, Any
from copy import copy
import warnings
import re
from .error import APIError, ValueWarning, ValueOptionError, ValueErrorWarning, PropertiesOptionError
from .setting import undefined
from .i18n import _
TYPE = {'boolean': bool,
'number': int,
'string': str,
'password': str,
'domain': str}
class Option:
# fake Option (IntOption, StrOption, ...)
# only usefull for warnings
def __init__(self,
path):
self.path = path
def __call__(self):
# suppose to be a weakref
return self
def impl_getpath(self):
return self.path
class TiramisuOptionOption:
# config.option(path).option
def __init__(self,
config: 'Config',
path: str,
schema: Dict,
model: Dict,
form: Dict) -> None:
self.config = config
self._path = path
self.schema = schema
self.model = model
self.form = form
def doc(self):
return self.schema['title']
def path(self):
return self._path
def name(self,
follow_symlink: bool=False) -> str:
if not follow_symlink or \
self.isoptiondescription() or \
not self.issymlinkoption():
path = self._path
else:
path = self.schema['opt_path']
return path.rsplit('.', 1)[-1]
def isoptiondescription(self):
return self.schema['type'] in ['object', 'array']
def ismasterslaves(self):
return self.schema['type'] == 'array'
def issymlinkoption(self) -> bool:
return self.schema['type'] == 'symlink'
def ismulti(self) -> bool:
return self.schema.get('isMulti', False)
def type(self) -> str:
if self.ismasterslaves():
return 'masterslaves'
if self.isoptiondescription():
return 'optiondescription'
types = {'number': 'int',
'choice': 'choice',
'boolean': 'bool',
'password': 'password',
'date': 'date',
'domain': 'domainname',
'url': 'url',
'username': 'username',
'string': 'str',
'symlink': 'symlink'}
if self.schema['type'] in types:
return types[self.schema['type']]
raise Exception('unsupported type {}'.format(self.schema['type']))
def properties(self) -> List[str]:
model = self.model.get(self._path, {})
return self.config.get_properties(self.model, self._path)
def requires(self) -> None:
# FIXME
return None
class TiramisuOptionProperty:
# config.option(path).property
def __init__(self,
config: 'Config',
path: str,
model: Dict) -> None:
self.config = config
self.path = path
self.model = model
def get(self, only_raises=False):
if not only_raises:
props = self.config.get_properties(self.model, self.path, only_raises)
else:
props = []
if self.config.get_hidden(self.path):
props.append('hidden')
return props
class _Value:
def _dict_walk(self,
ret: Dict,
schema: Dict,
root: str,
fullpath: bool,
withwarning: bool,
parent_key: str=None) -> None:
len_master = None
for key, option in schema['properties'].items():
hidden = self.temp.get(key, {}).get('hidden', None)
model_display = not self.model.get(key, {}).get('hidden', False) and \
self.model.get(key, {}).get('display', True)
if hidden is False or (hidden is None and model_display is True):
if option['type'] in ['object', 'array']:
self._dict_walk(ret,
option,
root,
fullpath,
withwarning,
key)
else:
value = self.config.get_value(key, len_master)
if parent_key is not None and schema.get('type') == 'array' and len_master is None:
len_master = len(value)
self._display_warnings(key, value, option['type'], option['name'], withwarning)
ret[key] = value
elif parent_key is not None and schema.get('type') == 'array' and len_master is None:
break
def dict(self,
fullpath: bool=False,
withwarning: bool=False):
ret = {}
self._dict_walk(ret,
self.schema,
self.path,
fullpath,
withwarning)
return ret
def _display_warnings(self, path, value, type, name, withwarning=True):
for err in self.model.get(path, {}).get('error', []):
warnings.warn_explicit(ValueErrorWarning(value,
type,
Option(path),
'{0}'.format(err)),
ValueErrorWarning,
self.__class__.__name__, 0)
if withwarning and self.model.get(path, {}).get('warnings'):
for warn in self.model.get(path, {}).get('warnings'):
warnings.warn_explicit(ValueErrorWarning(value,
type,
Option(path),
'{0}'.format(err)),
ValueErrorWarning,
self.__class__.__name__, 0)
class TiramisuOptionOwner:
# config.option(path).owner
def __init__(self,
config: 'Config',
schema: Dict,
model: List[Dict],
form: List[Dict],
temp: List[Dict],
path: str,
index: int) -> None:
self.config = config
self.schema = schema
self.model = model
self.form = form
self.temp = temp
self.path = path
self.index = index
def isdefault(self) -> Any:
return self.config.get_owner(self.path) == 'default'
def get(self) -> str:
return self.config.get_owner(self.path)
class TiramisuOptionValue(_Value):
# config.option(path).value
def __init__(self,
config: 'Config',
schema: Dict,
model: List[Dict],
form: List[Dict],
temp: List[Dict],
path: str,
index: int) -> None:
self.config = config
self.schema = schema
self.model = model
self.form = form
self.temp = temp
self.path = path
self.index = index
def get(self) -> Any:
value = self.config.get_value(self.path)
self._display_warnings(self.path, value, self.schema['type'], self.schema['name'])
return value
def list(self):
return self.schema['enum']
def _validate(self, type_, value):
if value in [None, undefined]:
return
if type_ == 'choice':
if value not in self.schema['enum']:
raise Exception('value {} is not in {}'.format(value, self.schema['enum']))
elif not isinstance(value, TYPE[type_]):
raise Exception('value {} is not a valid {} '.format(value, type_))
def set(self, value):
type_ = self.schema['type']
if self.schema.get('isMulti', False):
if not isinstance(value, list):
raise Exception('value must be a list')
for val in value:
self._validate(type_, val)
else:
self._validate(type_, value)
remote = self.form.get(self.path, {}).get('remote', False)
self.config.modify_value(self.path,
self.index,
value,
remote)
self._display_warnings(self.path, value, type_, self.schema['name'])
def reset(self):
remote = self.form.get(self.path, {}).get('remote', False)
self.config.delete_value(self.path,
self.index,
remote)
class _Option:
def list(self,
type='option'):
if type not in ['all', 'option']:
raise NotImplementedError()
for path, schema in self.schema['properties'].items():
if type == 'all' or schema['type'] not in ['object', 'array']:
hidden = self.temp.get(path, {}).get('hidden', None)
model_display = not self.model.get(path, {}).get('hidden', False) and \
self.model.get(path, {}).get('display', True)
if hidden is False or (hidden is None and model_display is True):
if schema['type'] in ['object', 'array']:
yield TiramisuOptionDescription(self.config,
schema,
self.model,
self.form,
self.temp,
path)
else:
yield TiramisuOption(self.config,
schema,
self.model,
self.form,
self.temp,
path,
self.index)
class TiramisuOptionDescription(_Option):
# config.option(path) (with path == OptionDescription)
def __init__(self,
config: 'Config',
schema: Dict,
model: List[Dict],
form: List[Dict],
temp: List[Dict],
path: str) -> None:
self.config = config
self.schema = schema
self.model = model
self.form = form
self.temp = temp
self.path = path
self.index = None
def __getattr__(self,
subfunc: str) -> Any:
if subfunc == 'option':
return TiramisuOptionOption(self.config,
self.path,
self.schema,
self.model,
self.form)
if subfunc == 'property':
return TiramisuOptionProperty(self.config,
self.path,
self.model.get(self.path, {}))
if subfunc == 'value':
return TiramisuOptionValue(self.config,
self.schema,
self.model,
self.form,
self.temp,
self.path,
self.index)
raise APIError(_('please specify a valid sub function ({})').format(subfunc))
def group_type(self):
hidden = self.temp.get(self.path, {}).get('hidden', None)
model_display = not self.model.get(self.path, {}).get('hidden', False) and \
self.model.get(self.path, {}).get('display', True)
if hidden is False or (hidden is None and model_display):
# FIXME
return 'default'
raise PropertiesOptionError(None, None, None, opt_type='optiondescription')
class TiramisuOption:
# config.option(path) (with path == Option)
def __init__(self,
config: 'Config',
schema: Dict,
model: List[Dict],
form: List[Dict],
temp: List[Dict],
path: str,
index: Optional[int]) -> None:
self.config = config
self.schema = schema
self.model = model
self.form = form
self.temp = temp
self.path = path
self.index = index
def __getattr__(self,
subfunc: str) -> Any:
if subfunc == 'option':
if self.index != None:
raise NotImplementedError()
return TiramisuOptionOption(self.config,
self.path,
self.schema,
self.model,
self.form)
if subfunc == 'value':
return TiramisuOptionValue(self.config,
self.schema,
self.model,
self.form,
self.temp,
self.path,
self.index)
if subfunc == 'owner':
return TiramisuOptionOwner(self.config,
self.schema,
self.model,
self.form,
self.temp,
self.path,
self.index)
if subfunc == 'property':
if self.index != None:
raise NotImplementedError()
return TiramisuOptionProperty(self.config,
self.path,
self.model.get(self.path, {}))
raise APIError(_('please specify a valid sub function ({})').format(subfunc))
class TiramisuContextProperty:
# config.property
# def __init__(self,
# json):
# self.json = json
def get(self):
# FIXME ?
return ['demoting_error_warning']
class ContextOption(_Option):
# config.option
def __init__(self,
config: 'Config',
model: Dict,
form: Dict,
schema: Dict,
temp: Dict) -> None:
self.config = config
self.model = model
self.form = form
self.schema = {'properties': schema}
self.temp = temp
self.index = None
def __call__(self,
path: str,
index: Optional[int]=None) -> TiramisuOption:
schema = self.config.get_schema(path)
if schema['type'] in ['object', 'array']:
return TiramisuOptionDescription(self.config,
schema,
self.model,
self.form,
self.temp,
path)
return TiramisuOption(self.config,
schema,
self.model,
self.form,
self.temp,
path,
index)
class ContextValue(_Value):
# config.value
def __init__(self,
config: 'Config',
model: Dict,
form: Dict,
schema: Dict,
temp: Dict) -> None:
self.config = config
self.model = model
self.form = form
first = next(iter(schema.keys()))
self.path = first.rsplit('.', 1)[0]
self.schema = {'properties': schema}
self.temp = temp
def __call__(self) -> TiramisuOptionValue:
return TiramisuOptionValue(self.config,
self.schema,
self.model,
self.form,
self.temp,
path,
index)
def mandatory(self):
for key, value in self.dict().items():
if self.model.get(key, {}).get('required') and \
value is None or \
(self.schema.get('isMulti') and (None in value or '' in value)):
yield key
class Config:
# config
def __init__(self,
json):
self.model_ori = json['model']
self.model = self.gen_model(json['model'])
self.form = {}
for option in json['form']:
if 'key' in option:
if 'pattern' in option:
option['pattern'] = re.compile(option['pattern'])
self.form[option['key']] = option
self.temp = {}
self.schema = json['schema']
self.updates = []
first_path = next(iter(self.schema.keys()))
if '.' in first_path:
self.root = first_path.rsplit('.', 1)[0]
else:
self.root = ''
def gen_model(self,
model) -> List[Dict]:
ret = {}
for option in model:
key = option['key']
if 'index' in option:
if key not in ret:
ret[key] = copy(option)
ret[key]['value'] = {ret[key]['index']: (ret[key]['value'], ret[key]['owner'])}
del ret[key]['index']
del ret[key]['owner']
elif option.get('hidden') is not True:
ret[key]['value'][option['index']] = (option['value'], option['owner'])
else:
ret[key] = option
return ret
def __getattr__(self,
subfunc: str) -> Any:
if subfunc == 'property':
return TiramisuContextProperty()
if subfunc == 'option':
return ContextOption(self,
self.model,
self.form,
self.schema,
self.temp)
if subfunc == 'value':
return ContextValue(self,
self.model,
self.form,
self.schema,
self.temp)
raise APIError(_('please specify a valid sub function ({})').format(subfunc))
def add_value(self,
path: str,
index: Optional[int],
value: Any,
remote: bool) -> None:
self.updates_value('add',
path,
index,
value,
remote,
None)
def modify_value(self,
path: str,
index: Optional[int],
value: Any,
remote: bool) -> None:
schema = self.get_schema(path)
if value and isinstance(value, list) and value[-1] is undefined:
new_value = schema.get('defaultvalue')
if new_value is None:
len_value = len(value)
schema_value = schema.get('value', [])
if len(schema_value) >= len_value:
new_value = schema_value[len_value - 1]
value[-1] = new_value
self.updates_value('modify',
path,
index,
value,
remote,
None)
def delete_value(self,
path: str,
index: Optional[int],
remote: bool) -> None:
schema = self.get_schema(path)
value = schema.get('value');
if value is None and schema.get('isMulti', False):
value = []
self.updates_value('delete',
path,
index,
value,
remote,
None)
def get_properties(self,
model,
path,
only_raises=True):
props = model.get('properties', [])[:]
if model.get('required'):
if self.get_schema(path).get('isMulti', False):
props.append('empty')
else:
props.append('mandatory')
if model.get('needs_len'):
props.append('mandatory')
if model.get('readOnly'):
props.append('frozen')
if only_raises and self.get_hidden(path):
props.append('hidden')
if self.form.get(path, {}).get('clearable'):
props.append('clearable')
return props
def get_schema(self,
path):
root_path = self.root
schema = {'properties': self.schema,
'type': 'object'}
if root_path:
root = self.root.split('.')
if not path.startswith(self.root):
raise Exception('cannot find {0}'.format(path))
subpaths = path.split('.')[len(root):]
else:
subpaths = path.split('.')
for subpath in subpaths:
if root_path:
root_path += '.' + subpath
else:
root_path = subpath
schema = schema['properties'][root_path]
return schema
def get_hidden(self,
path: str) -> bool:
property_ = 'hidden'
if property_ in self.temp.get(path, {}):
value = self.temp[path][property_]
else:
value = self.model.get(path, {}).get(property_)
return value
def get_value(self,
path: str,
len_master: int=None) -> Any:
if len_master is None:
if 'value' in self.temp.get(path, {}):
value = self.temp[path]['value']
else:
value = self.model.get(path, {}).get('value')
if value is None and self.get_schema(path).get('isMulti', False):
value = []
else:
value = self.temp.get(path)
if value is None:
value = self.model.get(path)
if value is None:
value = [None] * len_master
else:
val = []
for index in range(len_master):
if index in value['value']:
val.append(value['value'][index][0])
else:
val.append(None)
value = val
return value
def get_owner(self,
path: str) -> str:
if 'owner' in self.temp.get(path, {}):
owner = self.temp[path]['owner']
else:
owner = self.model.get(path, {}).get('owner', 'default')
return owner
def updates_value(self,
action: str,
path: str,
index: Optional[int],
value: Optional[Any],
remote: bool,
masterslaves: Optional[str]) -> None:
update_last_action = False
if self.updates:
last_body = self.updates[-1]
if last_body['name'] == path:
if index is None and not 'index' in last_body:
last_action = last_body['action']
if last_action == action or \
last_action in ['delete', 'modify'] and action in ['delete', 'modify']:
update_last_action = True
elif index == None and action == 'delete':
for update in reversed(self.updates):
if masterslaves is None and update['name'] == path or \
masterslaves and path.startswith(masterslaves + '.'):
del self.updates[-1]
else:
break
elif last_body['index'] == index:
if last_body['action'] == 'add' and action == 'modify':
action = 'add'
update_last_action = True
elif last_body['action'] == action and action != 'delete' or \
last_body['action'] == 'modify' and action == 'delete':
update_last_action = True
elif last_body['action'] == 'add' and action == 'delete':
del self.updates[-1]
if update_last_action:
if value is None:
if 'value' in last_body:
del last_body['value']
else:
last_body['value'] = value
if index is None and 'index' in last_body:
del last_body['index']
last_body['action'] = action
else:
data = {'action': action,
'name': path}
if value is not None:
data['value'] = value
if index is not None:
data['index'] = index
self.updates.append(data)
if 'pattern' in self.form.get(path, {}) and (not isinstance(value, list) or undefined not in value):
match = self.test_value(path, value, remote)
else:
match = True
if match:
if remote:
self.updates_data(self.send_data({'updates': self.updates,
'model': self.model_ori}))
else:
self.temp.setdefault(path, {})['owner'] = 'tmp'
self.temp[path]['value'] = value
self.set_dependencies(path, value)
self.set_not_equal(path, value)
self.do_copy(path, value)
def updates_data(self, data):
self.updates = []
self.temp.clear()
# FIXME remove old key ?
for model in data['model']:
self.model[model['key']] = model
self.model_ori = data['model']
def test_value(self,
path: str,
value: Any,
remote: bool):
if isinstance(value, list):
for val in value:
if not self.test_value(path, val, remote):
return False
return True
else:
if value is None:
match = True
else:
match = self.form[path]['pattern'].search(value)
if not remote:
if not match:
self.temp.setdefault(path, {})['error'] = ['']
elif 'error' in self.model[path]:
del self.temp[path]['error']
return match
def set_dependencies(self,
path: str,
ori_value: Any,
force_hide: bool=False) -> None:
dependencies = self.form.get(path, {}).get('dependencies', {})
if dependencies:
if ori_value in dependencies['expected']:
expected = dependencies['expected'][ori_value]
else:
expected = dependencies['default']
for action in ['hide', 'show']:
expected_actions = expected.get(action)
if expected_actions:
if force_hide:
hidden = True
else:
hidden = action == 'hide'
for expected_path in expected_actions:
self.temp.setdefault(expected_path, {})['hidden'] = hidden
if 'value' in self.temp[expected_path]:
value = self.temp[expected_path]['value']
else:
value = self.model[expected_path].get('value')
self.set_dependencies(expected_path, value, hidden)
def set_not_equal(self,
path: str,
value: Any) -> None:
not_equal = self.form.get(path, {}).get('not_equal', {})
if not_equal:
vals = []
opts = []
if isinstance(value, list):
for val in value:
vals.append(val)
opts.append(path)
else:
vals.append(value)
opts.append(path)
for path_ in self.form[path]['not_equal']['options']:
schema = self.get_schema(path_)
p_value = self.get_value(path_)
if isinstance(p_value, list):
for val in p_value:
vals.append(val)
opts.append(path_)
else:
vals.append(p_value)
opts.append(path_)
equal = []
warnings_only = self.form[path]['not_equal'].get('warnings', False)
if warnings_only:
msg = _('should be different from the value of "{}"')
msgcurr = _('value for {} should be different')
else:
msg = _('must be different from the value of "{}"')
msgcurr = _('value for {} must be different')
for idx_inf, val_inf in enumerate(vals):
for idx_sup, val_sup in enumerate(vals[idx_inf + 1:]):
if val_inf == val_sup is not None:
for opt_ in [opts[idx_inf], opts[idx_inf + idx_sup + 1]]:
if opt_ not in equal:
equal.append(opt_)
if equal:
equal_name = {}
for opt in equal:
schema = self.get_schema(opt)
equal_name[opt] = schema['title']
for opt_ in equal:
display_equal = []
for opt__ in equal:
if opt_ != opt__:
display_equal.append(equal_name[opt_])
display_equal = ', '.join(display_equal)
if opt_ == path:
msg_ = msgcurr.format(display_equal)
else:
msg_ = msg.format(display_equal)
if warnings_only:
self.model[opt_].setdefault('warnings', []).append(msg_)
else:
self.model[opt_].setdefault('error', []).append(msg_)
else:
for opt in opts:
if 'warnings' in self.model[opt]:
del self.model[opt]['warnings']
if 'error' in self.model[opt]:
del self.model[opt]['error']
def do_copy(self,
path: str,
value: Any) -> None:
copy = self.form.get(path, {}).get('copy')
if copy:
for opt in copy:
owner = self.get_owner(opt)
if owner == 'default':
# do not change in this.temp, it's default value
self.model[opt]['value'] = value
def send_data(self,
updates):
raise NotImplementedError('please implement send_data function')