tiramisu-api-python/tiramisu_json_api/api.py

724 lines
26 KiB
Python
Raw Normal View History

2018-12-24 09:28:44 +01:00
from typing import Optional, Dict, List, Any
import warnings
import re
from tiramisu.error import APIError, ValueWarning, ValueOptionError, ValueErrorWarning, PropertiesOptionError
from tiramisu.setting import undefined
2019-01-10 09:54:22 +01:00
# FIXME
def _(val):
return val
2018-12-24 09:28:44 +01:00
class Option:
2019-01-10 09:54:22 +01:00
# fake Option (IntOption, StrOption, ...)
2018-12-24 09:28:44 +01:00
def __init__(self,
name,
path):
self.name = name
self.path = path
def __call__(self):
return self
def impl_get_display_name(self):
return self.name
def impl_getpath(self):
return self.path
class TiramisuOptionOption:
2019-01-10 09:54:22 +01:00
# config.option(path).option
2018-12-24 09:28:44 +01:00
def __init__(self,
path: str,
2019-01-10 09:54:22 +01:00
schema: Dict,
model: Dict) -> None:
2018-12-24 09:28:44 +01:00
self._path = path
self.schema = schema
2019-01-10 09:54:22 +01:00
self.model = model
2018-12-24 09:28:44 +01:00
def doc(self):
return self.schema['title']
def path(self):
return self._path
2019-01-10 09:54:22 +01:00
def name(self):
return self._path.rsplit('.', 1)[-1]
2018-12-24 09:28:44 +01:00
def isoptiondescription(self):
return self.schema['type'] in ['object', 'array']
def ismasterslaves(self):
return self.schema['type'] == 'array'
def issymlinkoption(self) -> bool:
# FIXME
return False
def ismulti(self) -> bool:
return self.schema.get('isMulti', False)
def type(self) -> str:
if self.ismasterslaves():
return 'masterslaves'
if self.isoptiondescription():
return 'optiondescription'
types = {'number': 'int',
'choice': 'choice',
'boolean': 'bool',
'password': 'password',
'date': 'date',
'domain': 'domainname',
'url': 'url',
'username': 'username',
'string': 'str'}
if self.schema['type'] in types :
return types[self.schema['type']]
raise Exception('unsupported type {}'.format(self.schema['type']))
2019-01-10 09:54:22 +01:00
def properties(self) -> List[str]:
props = []
model = self.model[self._path]
if model.get('required'):
#FIXME 'empty', 'needs_len'
props.append('mandatory')
if model.get('readOnly'):
props.append('frozen')
if model.get('hidden'):
props.append('hidden')
return props
def requires(self) -> None:
# FIXME
return None
2018-12-24 09:28:44 +01:00
class TiramisuOptionProperty:
2019-01-10 09:54:22 +01:00
# config.option(path).property
2018-12-24 09:28:44 +01:00
def __init__(self,
model: Dict) -> None:
self.model = model
2019-01-10 09:54:22 +01:00
def get(self, only_raises=False):
2018-12-24 09:28:44 +01:00
# FIXME if slave:
#if not isslave and childapi.option.ismulti():
# if 'empty' in props:
# obj['required'] = True
# props.remove('empty')
# if 'mandatory' in props:
# obj['needs_len'] = True
# props.remove('mandatory')
2019-01-10 09:54:22 +01:00
if not only_raises:
properties = self.model.get('properties', [])[:]
if self.model.get('required', False):
properties.append('mandatory')
if self.model.get('readOnly', False):
properties.append('frozen')
else:
properties = []
2018-12-24 09:28:44 +01:00
if self.model.get('hidden', False):
properties.append('hidden')
return properties
2019-01-10 09:54:22 +01:00
class _Value:
def _dict_walk(self,
ret: Dict,
schema: Dict,
root: str,
fullpath: bool,
withwarning: bool):
for key, option in schema['properties'].items():
hidden = self.temp.get(key, {}).get('hidden', None)
model_hidden = not self.model.get(key, {}).get('hidden', False) and \
self.model.get(key, {}).get('display', True)
if hidden is False or (hidden is None and model_hidden):
if option['type'] == 'object':
self._dict_walk(ret,
option,
root,
fullpath,
withwarning)
else:
value = self._get_value(key, schema)
self._display_warnings(key, value, option['type'], option['name'], withwarning)
ret[key] = value
def dict(self,
fullpath: bool=False,
withwarning: bool=False):
ret = {}
self._dict_walk(ret,
self.schema,
self.path,
fullpath,
withwarning)
return ret
2018-12-24 09:28:44 +01:00
2019-01-10 09:54:22 +01:00
def _display_warnings(self, path, value, type, name, withwarning=True):
2018-12-24 09:28:44 +01:00
if self.model.get(path, {}).get('error'):
for err in self.model.get(path, {}).get('error'):
warnings.warn_explicit(ValueOptionError(value,
type,
Option(name, path),
'{0}'.format(err)),
ValueErrorWarning,
self.__class__.__name__, 0)
2019-01-10 09:54:22 +01:00
if withwarning and self.model.get(path, {}).get('warnings'):
2018-12-24 09:28:44 +01:00
for warn in self.model.get(path, {}).get('warnings'):
warnings.warn_explicit(ValueWarning('{0}'.format(warn),
Option(name, path)),
ValueWarning,
self.__class__.__name__, 0)
def _get_value(self,
path: str,
schema: Dict) -> Any:
if 'value' in self.temp.get(path, {}):
value = self.temp[path]['value']
else:
value = self.model.get(path, {}).get('value')
if value is None and schema.get('isMulti', False):
value = []
return value
2019-01-10 09:54:22 +01:00
class TiramisuOptionValue(_Value):
# config.option(path).value
def __init__(self,
config: 'Config',
schema: Dict,
model: List[Dict],
form: List[Dict],
temp: List[Dict],
path: str,
index: int) -> None:
self.config = config
self.schema = schema
if schema.get('properties') != None:
raise Exception('pouet')
self.model = model
self.form = form
self.temp = temp
self.path = path
self.index = index
2018-12-24 09:28:44 +01:00
def get(self) -> Any:
value = self._get_value(self.path, self.schema)
self._display_warnings(self.path, value, self.schema['type'], self.schema['name'])
return value
def list(self):
return self.schema['enum']
def set(self, value):
self.config.modify_value(self.path,
self.index,
value,
self.form.get(self.path, {}).get('remote', False))
self._display_warnings(self.path, value, self.schema['type'], self.schema['name'])
def reset(self):
self.config.delete_value(self.path,
self.index,
self.form.get(self.path, {}).get('remote', False))
2019-01-10 09:54:22 +01:00
class _Option:
def list(self,
type='option'):
if type not in ['all', 'option']:
raise NotImplementedError()
if self.schema.get('properties') is None:
raise Exception(list(self.schema.keys()))
for path, schema in self.schema['properties'].items():
if type == 'all' or schema['type'] not in ['object', 'array']:
hidden = self.temp.get(path, {}).get('hidden', None)
model_hidden = not self.model.get(path, {}).get('hidden', False) and \
self.model.get(path, {}).get('display', True)
if hidden is False or (hidden is None and model_hidden is True):
2019-01-10 09:54:22 +01:00
if schema['type'] in ['object', 'array']:
yield TiramisuOptionDescription(self.config,
schema,
self.model,
self.form,
self.temp,
path)
else:
yield TiramisuOption(self.config,
schema,
self.model,
self.form,
self.temp,
path,
self.index)
class TiramisuOptionDescription(_Option):
# config.option(path) (with path == OptionDescription)
def __init__(self,
config: 'Config',
schema: Dict,
model: List[Dict],
form: List[Dict],
temp: List[Dict],
path: str) -> None:
self.config = config
self.schema = schema
self.model = model
self.form = form
self.temp = temp
self.path = path
self.index = None
def __getattr__(self,
subfunc: str) -> Any:
if subfunc == 'option':
return TiramisuOptionOption(self.path,
self.schema,
self.model)
if subfunc == 'property':
return TiramisuOptionProperty(self.model.get(self.path, {}))
raise APIError(_('please specify a valid sub function ({})').format(subfunc))
def group_type(self):
hidden = self.temp.get(key, {}).get('hidden', None)
model_hidden = not self.model.get(key, {}).get('hidden', False) and \
self.model.get(key, {}).get('display', True)
if hidden is False or (hidden is None and model_hidden):
2019-01-10 09:54:22 +01:00
# FIXME
return 'default'
raise PropertiesOptionError(None, None, None, opt_type='optiondescription')
2018-12-24 09:28:44 +01:00
class TiramisuOption:
2019-01-10 09:54:22 +01:00
# config.option(path) (with path == Option)
2018-12-24 09:28:44 +01:00
def __init__(self,
config: 'Config',
schema: Dict,
model: List[Dict],
form: List[Dict],
temp: List[Dict],
path: str,
index: Optional[int]) -> None:
self.config = config
self.schema = schema
self.model = model
self.form = form
self.temp = temp
self.path = path
self.index = index
def __getattr__(self,
subfunc: str) -> Any:
if subfunc == 'option':
if self.index != None:
raise NotImplementedError()
return TiramisuOptionOption(self.path,
2019-01-10 09:54:22 +01:00
self.schema,
self.model)
2018-12-24 09:28:44 +01:00
if subfunc == 'value':
return TiramisuOptionValue(self.config,
self.schema,
self.model,
self.form,
self.temp,
self.path,
self.index)
if subfunc == 'property':
if self.index != None:
raise NotImplementedError()
return TiramisuOptionProperty(self.model.get(self.path, {}))
raise APIError(_('please specify a valid sub function ({})').format(subfunc))
class TiramisuContextProperty:
2019-01-10 09:54:22 +01:00
# config.property
2018-12-24 09:28:44 +01:00
# def __init__(self,
# json):
# self.json = json
def get(self):
# FIXME ?
return ['demoting_error_warning']
2019-01-10 09:54:22 +01:00
class ContextOption(_Option):
# config.option
def __init__(self,
config: 'Config',
model: Dict,
form: Dict,
schema: Dict,
temp: Dict) -> None:
self.config = config
self.model = model
self.form = form
schema = {'properties': schema}
self.schema = schema
self.temp = temp
self.index = None
def __call__(self,
path: str,
index: Optional[int]=None) -> TiramisuOption:
schema = self.config.get_schema(path)
if schema['type'] in ['object', 'array']:
return TiramisuOptionDescription(self.config,
schema,
self.model,
self.form,
self.temp,
path)
2019-01-10 09:54:22 +01:00
return TiramisuOption(self.config,
schema,
2019-01-10 09:54:22 +01:00
self.model,
self.form,
self.temp,
path,
index)
class ContextValue(_Value):
# config.value
def __init__(self,
config: 'Config',
model: Dict,
form: Dict,
schema: Dict,
temp: Dict) -> None:
self.config = config
self.model = model
self.form = form
first = next(iter(schema.keys()))
self.path = first.rsplit('.', 1)[0]
schema = {'properties': schema}
self.schema = schema
self.temp = temp
def __call__(self) -> TiramisuOptionValue:
return TiramisuOptionValue(self.config,
self.schema,
self.model,
self.form,
self.temp,
path,
index)
2018-12-24 09:28:44 +01:00
class Config:
2019-01-10 09:54:22 +01:00
# config
2018-12-24 09:28:44 +01:00
def __init__(self,
json):
self.model_ori = json['model']
self.model = {option['key']: option for option in json['model']}
self.form = {}
for option in json['form']:
if option.get('key'):
if 'pattern' in option:
option['pattern'] = re.compile(option['pattern'])
self.form[option['key']] = option
self.temp = {}
self.schema = json['schema']
self.updates = []
2019-01-10 09:54:22 +01:00
first_path = next(iter(self.schema.keys()))
if '.' in first_path:
self.root = first_path.rsplit('.', 1)[0]
else:
self.root = ''
2018-12-24 09:28:44 +01:00
def __getattr__(self,
subfunc: str) -> Any:
if subfunc == 'property':
return TiramisuContextProperty()
2019-01-10 09:54:22 +01:00
if subfunc == 'option':
return ContextOption(self,
self.model,
self.form,
self.schema,
self.temp)
if subfunc == 'value':
return ContextValue(self,
self.model,
self.form,
self.schema,
self.temp)
2018-12-24 09:28:44 +01:00
raise APIError(_('please specify a valid sub function ({})').format(subfunc))
def add_value(self,
path: str,
index: Optional[int],
value: Any,
remote: bool) -> None:
self.updates_value('add',
path,
index,
value,
remote,
None)
def modify_value(self,
path: str,
index: Optional[int],
value: Any,
remote: bool) -> None:
schema = self.get_schema(path)
if value and isinstance(value, list) and value[-1] is undefined:
new_value = schema.get('defaultvalue')
if new_value is None:
len_value = len(value)
if len(schema.get('value')) >= len_value:
new_value = schema.get('value')[len_value - 1]
value[-1] = new_value
self.updates_value('modify',
path,
index,
value,
remote,
None)
def delete_value(self,
path: str,
index: Optional[int],
remote: bool) -> None:
schema = self.get_schema(path)
value = schema['value'];
if value is None and schema.get('isMulti', False):
value = []
self.updates_value('delete',
path,
index,
value,
remote,
None)
def get_schema(self,
path):
2019-01-10 09:54:22 +01:00
root_path = self.root
schema = {'properties': self.schema}
2019-01-10 09:54:22 +01:00
if root_path:
root = self.root.split('.')
subpaths = path.split('.')[len(root):]
2018-12-24 09:28:44 +01:00
else:
subpaths = path.split('.')
for subpath in subpaths:
if root_path:
root_path += '.' + subpath
else:
root_path = subpath
schema = schema['properties'][root_path]
2018-12-24 09:28:44 +01:00
return schema
def updates_value(self,
action: str,
path: str,
index: Optional[int],
value: Optional[Any],
remote: bool,
masterslaves: Optional[str]) -> None:
update_last_action = False
if self.updates:
last_body = self.updates[-1]
if last_body['name'] == path:
if index is None and not 'index' in last_body:
last_action = last_body['action']
if last_action == action or \
last_action in ['delete', 'modify'] and action in ['delete', 'modify']:
update_last_action = True
elif index == None and action == 'delete':
for update in reversed(self.updates):
update['name']
if masterslaves == None and update['name'] == path or \
masterslaves and path.startswith(masterslaves + '.'):
del self.updates[-1]
else:
break
elif last_body['index'] == index:
if last_body['action'] == 'add' and action == 'modify':
action = 'add'
update_last_action = True
elif last_body['action'] == action and action != 'delete' or \
last_body['action'] == 'modify' and action == 'delete':
update_last_action = True
elif last_body['action'] == 'add' and action == 'delete':
2019-01-10 09:54:22 +01:00
del self.updates[-1]
2018-12-24 09:28:44 +01:00
if update_last_action:
if value is None:
if 'value' in last_body:
del last_body['value']
else:
last_body['value'] = value
if index is None and 'index' in last_body:
del last_body['index']
last_body['action'] = action
else:
data = {'action': action,
'name': path}
if value is not None:
data['value'] = value
if index is not None:
data['index'] = index
self.updates.append(data)
if 'pattern' in self.form.get(path, {}) and (not isinstance(value, list) or undefined not in value):
match = self.test_value(path, value, remote)
else:
match = True
if match:
if remote:
ret = self.send_data({'updates': self.updates,
'model': self.model_ori})
self.updates = []
self.temp.clear()
# FIXME remove old key ?
for model in ret['model']:
self.model[model['key']] = model
self.model_ori = ret['model']
else:
self.temp.setdefault(path, {})['owner'] = 'tmp'
self.temp[path]['value'] = value
self.set_dependencies(path, value)
2018-12-24 09:28:44 +01:00
self.set_not_equal(path, value)
self.do_copy(path, value)
def test_value(self,
path: str,
value: Any,
remote: bool):
if isinstance(value, list):
for val in value:
if not self.test_value(path, val, remote):
return False
return True
else:
if value is None:
match = True
else:
match = self.form[path]['pattern'].search(value)
if not remote:
if not match:
self.temp.setdefault(path, {})['error'] = ['']
elif 'error' in self.model[path]:
del self.temp[path]['error']
return match
def set_dependencies(self,
path: str,
value: Any,
force_hide: bool=False) -> None:
2018-12-24 09:28:44 +01:00
dependencies = self.form.get(path, {}).get('dependencies', {})
if dependencies:
if value in dependencies['expected']:
expected = dependencies['expected'][value]
else:
expected = dependencies['default']
for action in ['hide', 'show']:
expected_actions = expected.get(action)
if expected_actions:
if force_hide:
hidden = True
else:
hidden = action == 'hide'
2018-12-24 09:28:44 +01:00
for expected_path in expected_actions:
self.temp.setdefault(expected_path, {})['hidden'] = hidden
if 'value' in self.temp.get(expected_path, {}):
value = self.temp[expected_path]['value']
else:
value = self.model[expected_path].get('value')
self.set_dependencies(expected_path, value, hidden)
2018-12-24 09:28:44 +01:00
def set_not_equal(self,
path: str,
value: Any) -> None:
not_equal = self.form.get(path, {}).get('not_equal', {})
if not_equal:
vals = []
opts = []
if isinstance(value, list):
for val in value:
vals.append(val)
opts.append(path)
else:
vals.append(value)
opts.append(path)
for path_ in self.form[path]['not_equal']['options']:
schema = self.get_schema(path_)
if not schema.get('isMulti', False):
default_value = None
else:
default_value = []
p_value = self.model[path_].get('value', default_value)
if isinstance(p_value, list):
for val in p_value:
vals.append(val)
opts.append(path_)
else:
vals.append(p_value)
opts.append(path_)
equal = []
is_current = False
warnings_only = self.form[path]['not_equal'].get('warnings', False)
if warnings_only:
msg = _('should be different from the value of "{}"')
msgcurr = _('value for {} should be different')
else:
msg = _('must be different from the value of "{}"')
msgcurr = _('value for {} must be different')
for idx_inf, val_inf in enumerate(vals):
for idx_sup, val_sup in enumerate(vals[idx_inf + 1:]):
if val_inf == val_sup is not None:
for opt_ in [opts[idx_inf], opts[idx_inf + idx_sup + 1]]:
if opt_ == path:
is_current = True
if opt_ not in equal:
equal.append(opt_)
if equal:
equal_name = {}
for opt in equal:
schema = self.get_schema(opt)
equal_name[opt] = schema['title']
len_equal_name = len(equal)
for opt_ in equal:
display_equal = []
for opt__ in equal:
if opt_ != opt__:
display_equal.append(equal_name[opt_])
display_equal = ', '.join(display_equal)
if opt_ == path:
msg_ = msgcurr.format(display_equal)
else:
msg_ = msg.format(display_equal)
if warnings_only:
self.model[opt_].setdefault('warnings', []).append(msg_)
else:
self.model[opt_].setdefault('error', []).append(msg_)
else:
for opt in opts:
if 'warnings' in self.model[opt]:
del self.model[opt]['warnings']
if 'error' in self.model[opt]:
del self.model[opt]['error']
def do_copy(self,
path: str,
value: Any) -> None:
copy = self.form.get(path, {}).get('copy')
if copy:
for opt in copy:
owner = self.temp.get(opt, {}).get('owner')
if owner is None:
owner = self.model[opt]['owner']
if owner == 'default':
self.model[opt]['value'] = value
def send_data(self,
updates):
raise NotImplementedError('please implement send_data function')