tiramisu-api-python/tiramisu_json_api/api.py

764 lines
28 KiB
Python

from typing import Optional, Dict, List, Any
import warnings
import re
from .error import APIError, ValueWarning, ValueOptionError, ValueErrorWarning, PropertiesOptionError
from .setting import undefined
from .i18n import _
TYPE = {'boolean': bool,
'number': int,
'string': str}
class Option:
# fake Option (IntOption, StrOption, ...)
def __init__(self,
name,
path):
self.name = name
self.path = path
def __call__(self):
return self
def impl_get_display_name(self):
return self.name
def impl_getpath(self):
return self.path
class TiramisuOptionOption:
# config.option(path).option
def __init__(self,
path: str,
schema: Dict,
model: Dict) -> None:
self._path = path
self.schema = schema
self.model = model
def doc(self):
return self.schema['title']
def path(self):
return self._path
def name(self):
return self._path.rsplit('.', 1)[-1]
def isoptiondescription(self):
return self.schema['type'] in ['object', 'array']
def ismasterslaves(self):
return self.schema['type'] == 'array'
def issymlinkoption(self) -> bool:
# FIXME
return False
def ismulti(self) -> bool:
return self.schema.get('isMulti', False)
def type(self) -> str:
if self.ismasterslaves():
return 'masterslaves'
if self.isoptiondescription():
return 'optiondescription'
types = {'number': 'int',
'choice': 'choice',
'boolean': 'bool',
'password': 'password',
'date': 'date',
'domain': 'domainname',
'url': 'url',
'username': 'username',
'string': 'str'}
if self.schema['type'] in types :
return types[self.schema['type']]
raise Exception('unsupported type {}'.format(self.schema['type']))
def properties(self) -> List[str]:
props = []
model = self.model[self._path]
if model.get('required'):
#FIXME 'empty', 'needs_len'
props.append('mandatory')
if model.get('readOnly'):
props.append('frozen')
if model.get('hidden'):
props.append('hidden')
return props
def requires(self) -> None:
# FIXME
return None
class TiramisuOptionProperty:
# config.option(path).property
def __init__(self,
model: Dict) -> None:
self.model = model
def get(self, only_raises=False):
# FIXME if slave:
#if not isslave and childapi.option.ismulti():
# if 'empty' in props:
# obj['required'] = True
# props.remove('empty')
# if 'mandatory' in props:
# obj['needs_len'] = True
# props.remove('mandatory')
if not only_raises:
properties = self.model.get('properties', [])[:]
if self.model.get('required', False):
properties.append('mandatory')
if self.model.get('readOnly', False):
properties.append('frozen')
else:
properties = []
if self.model.get('hidden', False):
properties.append('hidden')
return properties
class _Value:
def _dict_walk(self,
ret: Dict,
schema: Dict,
root: str,
fullpath: bool,
withwarning: bool):
for key, option in schema['properties'].items():
hidden = self.temp.get(key, {}).get('hidden', None)
model_hidden = not self.model.get(key, {}).get('hidden', False) and \
self.model.get(key, {}).get('display', True)
if hidden is False or (hidden is None and model_hidden):
if option['type'] == 'object':
self._dict_walk(ret,
option,
root,
fullpath,
withwarning)
else:
value = self._get_value(key, schema)
self._display_warnings(key, value, option['type'], option['name'], withwarning)
ret[key] = value
def dict(self,
fullpath: bool=False,
withwarning: bool=False):
ret = {}
self._dict_walk(ret,
self.schema,
self.path,
fullpath,
withwarning)
return ret
def _display_warnings(self, path, value, type, name, withwarning=True):
if self.model.get(path, {}).get('error'):
for err in self.model.get(path, {}).get('error'):
warnings.warn_explicit(ValueOptionError(value,
type,
Option(name, path),
'{0}'.format(err)),
ValueErrorWarning,
self.__class__.__name__, 0)
if withwarning and self.model.get(path, {}).get('warnings'):
for warn in self.model.get(path, {}).get('warnings'):
warnings.warn_explicit(ValueWarning('{0}'.format(warn),
Option(name, path)),
ValueWarning,
self.__class__.__name__, 0)
def _get_value(self,
path: str,
schema: Dict) -> Any:
if 'value' in self.temp.get(path, {}):
value = self.temp[path]['value']
else:
value = self.model.get(path, {}).get('value')
if value is None and schema.get('isMulti', False):
value = []
return value
class TiramisuOptionOwner:
# config.option(path).owner
def __init__(self,
config: 'Config',
schema: Dict,
model: List[Dict],
form: List[Dict],
temp: List[Dict],
path: str,
index: int) -> None:
self.config = config
self.schema = schema
self.model = model
self.form = form
self.temp = temp
self.path = path
self.index = index
def isdefault(self) -> Any:
owner = 'default'
if self.path in self.temp:
owner = self.temp[self.path]['owner']
elif self.path in self.model and 'owner' in self.model[self.path]:
owner = self.model[self.path]['owner']
return owner == 'default'
class TiramisuOptionValue(_Value):
# config.option(path).value
def __init__(self,
config: 'Config',
schema: Dict,
model: List[Dict],
form: List[Dict],
temp: List[Dict],
path: str,
index: int) -> None:
self.config = config
self.schema = schema
self.model = model
self.form = form
self.temp = temp
self.path = path
self.index = index
def get(self) -> Any:
value = self._get_value(self.path, self.schema)
self._display_warnings(self.path, value, self.schema['type'], self.schema['name'])
return value
def list(self):
return self.schema['enum']
def set(self, value):
if self.schema['type'] == 'choice':
if value not in self.schema['enum']:
raise Exception('value {} is not in {}'.format(value, self.schema['enum']))
elif not isinstance(value, TYPE[self.schema['type']]):
raise Exception('value {} is not a valid {} '.format(value, self.schema['type']))
self.config.modify_value(self.path,
self.index,
value,
self.form.get(self.path, {}).get('remote', False))
self._display_warnings(self.path, value, self.schema['type'], self.schema['name'])
def reset(self):
self.config.delete_value(self.path,
self.index,
self.form.get(self.path, {}).get('remote', False))
class _Option:
def list(self,
type='option'):
if type not in ['all', 'option']:
raise NotImplementedError()
if self.schema.get('properties') is None:
raise Exception(list(self.schema.keys()))
for path, schema in self.schema['properties'].items():
if type == 'all' or schema['type'] not in ['object', 'array']:
hidden = self.temp.get(path, {}).get('hidden', None)
model_hidden = not self.model.get(path, {}).get('hidden', False) and \
self.model.get(path, {}).get('display', True)
if hidden is False or (hidden is None and model_hidden is True):
if schema['type'] in ['object', 'array']:
yield TiramisuOptionDescription(self.config,
schema,
self.model,
self.form,
self.temp,
path)
else:
yield TiramisuOption(self.config,
schema,
self.model,
self.form,
self.temp,
path,
self.index)
class TiramisuOptionDescription(_Option):
# config.option(path) (with path == OptionDescription)
def __init__(self,
config: 'Config',
schema: Dict,
model: List[Dict],
form: List[Dict],
temp: List[Dict],
path: str) -> None:
self.config = config
self.schema = schema
self.model = model
self.form = form
self.temp = temp
self.path = path
self.index = None
def __getattr__(self,
subfunc: str) -> Any:
if subfunc == 'option':
return TiramisuOptionOption(self.path,
self.schema,
self.model)
if subfunc == 'property':
return TiramisuOptionProperty(self.model.get(self.path, {}))
raise APIError(_('please specify a valid sub function ({})').format(subfunc))
def group_type(self):
hidden = self.temp.get(key, {}).get('hidden', None)
model_hidden = not self.model.get(key, {}).get('hidden', False) and \
self.model.get(key, {}).get('display', True)
if hidden is False or (hidden is None and model_hidden):
# FIXME
return 'default'
raise PropertiesOptionError(None, None, None, opt_type='optiondescription')
class TiramisuOption:
# config.option(path) (with path == Option)
def __init__(self,
config: 'Config',
schema: Dict,
model: List[Dict],
form: List[Dict],
temp: List[Dict],
path: str,
index: Optional[int]) -> None:
self.config = config
self.schema = schema
self.model = model
self.form = form
self.temp = temp
self.path = path
self.index = index
def __getattr__(self,
subfunc: str) -> Any:
if subfunc == 'option':
if self.index != None:
raise NotImplementedError()
return TiramisuOptionOption(self.path,
self.schema,
self.model)
if subfunc == 'value':
return TiramisuOptionValue(self.config,
self.schema,
self.model,
self.form,
self.temp,
self.path,
self.index)
if subfunc == 'owner':
return TiramisuOptionOwner(self.config,
self.schema,
self.model,
self.form,
self.temp,
self.path,
self.index)
if subfunc == 'property':
if self.index != None:
raise NotImplementedError()
return TiramisuOptionProperty(self.model.get(self.path, {}))
raise APIError(_('please specify a valid sub function ({})').format(subfunc))
class TiramisuContextProperty:
# config.property
# def __init__(self,
# json):
# self.json = json
def get(self):
# FIXME ?
return ['demoting_error_warning']
class ContextOption(_Option):
# config.option
def __init__(self,
config: 'Config',
model: Dict,
form: Dict,
schema: Dict,
temp: Dict) -> None:
self.config = config
self.model = model
self.form = form
schema = {'properties': schema}
self.schema = schema
self.temp = temp
self.index = None
def __call__(self,
path: str,
index: Optional[int]=None) -> TiramisuOption:
schema = self.config.get_schema(path)
if schema['type'] in ['object', 'array']:
return TiramisuOptionDescription(self.config,
schema,
self.model,
self.form,
self.temp,
path)
return TiramisuOption(self.config,
schema,
self.model,
self.form,
self.temp,
path,
index)
class ContextValue(_Value):
# config.value
def __init__(self,
config: 'Config',
model: Dict,
form: Dict,
schema: Dict,
temp: Dict) -> None:
self.config = config
self.model = model
self.form = form
first = next(iter(schema.keys()))
self.path = first.rsplit('.', 1)[0]
schema = {'properties': schema}
self.schema = schema
self.temp = temp
def __call__(self) -> TiramisuOptionValue:
return TiramisuOptionValue(self.config,
self.schema,
self.model,
self.form,
self.temp,
path,
index)
class Config:
# config
def __init__(self,
json):
self.model_ori = json['model']
self.model = {option['key']: option for option in json['model']}
self.form = {}
for option in json['form']:
if option.get('key'):
if 'pattern' in option:
option['pattern'] = re.compile(option['pattern'])
self.form[option['key']] = option
self.temp = {}
self.schema = json['schema']
self.updates = []
first_path = next(iter(self.schema.keys()))
if '.' in first_path:
self.root = first_path.rsplit('.', 1)[0]
else:
self.root = ''
def __getattr__(self,
subfunc: str) -> Any:
if subfunc == 'property':
return TiramisuContextProperty()
if subfunc == 'option':
return ContextOption(self,
self.model,
self.form,
self.schema,
self.temp)
if subfunc == 'value':
return ContextValue(self,
self.model,
self.form,
self.schema,
self.temp)
raise APIError(_('please specify a valid sub function ({})').format(subfunc))
def add_value(self,
path: str,
index: Optional[int],
value: Any,
remote: bool) -> None:
self.updates_value('add',
path,
index,
value,
remote,
None)
def modify_value(self,
path: str,
index: Optional[int],
value: Any,
remote: bool) -> None:
schema = self.get_schema(path)
if value and isinstance(value, list) and value[-1] is undefined:
new_value = schema.get('defaultvalue')
if new_value is None:
len_value = len(value)
if len(schema.get('value')) >= len_value:
new_value = schema.get('value')[len_value - 1]
value[-1] = new_value
self.updates_value('modify',
path,
index,
value,
remote,
None)
def delete_value(self,
path: str,
index: Optional[int],
remote: bool) -> None:
schema = self.get_schema(path)
value = schema.get('value');
if value is None and schema.get('isMulti', False):
value = []
self.updates_value('delete',
path,
index,
value,
remote,
None)
def get_schema(self,
path):
root_path = self.root
schema = {'properties': self.schema}
if root_path:
root = self.root.split('.')
subpaths = path.split('.')[len(root):]
else:
subpaths = path.split('.')
for subpath in subpaths:
if root_path:
root_path += '.' + subpath
else:
root_path = subpath
schema = schema['properties'][root_path]
return schema
def updates_value(self,
action: str,
path: str,
index: Optional[int],
value: Optional[Any],
remote: bool,
masterslaves: Optional[str]) -> None:
update_last_action = False
if self.updates:
last_body = self.updates[-1]
if last_body['name'] == path:
if index is None and not 'index' in last_body:
last_action = last_body['action']
if last_action == action or \
last_action in ['delete', 'modify'] and action in ['delete', 'modify']:
update_last_action = True
elif index == None and action == 'delete':
for update in reversed(self.updates):
update['name']
if masterslaves == None and update['name'] == path or \
masterslaves and path.startswith(masterslaves + '.'):
del self.updates[-1]
else:
break
elif last_body['index'] == index:
if last_body['action'] == 'add' and action == 'modify':
action = 'add'
update_last_action = True
elif last_body['action'] == action and action != 'delete' or \
last_body['action'] == 'modify' and action == 'delete':
update_last_action = True
elif last_body['action'] == 'add' and action == 'delete':
del self.updates[-1]
if update_last_action:
if value is None:
if 'value' in last_body:
del last_body['value']
else:
last_body['value'] = value
if index is None and 'index' in last_body:
del last_body['index']
last_body['action'] = action
else:
data = {'action': action,
'name': path}
if value is not None:
data['value'] = value
if index is not None:
data['index'] = index
self.updates.append(data)
if 'pattern' in self.form.get(path, {}) and (not isinstance(value, list) or undefined not in value):
match = self.test_value(path, value, remote)
else:
match = True
if match:
if remote:
ret = self.send_data({'updates': self.updates,
'model': self.model_ori})
self.updates = []
self.temp.clear()
# FIXME remove old key ?
for model in ret['model']:
self.model[model['key']] = model
self.model_ori = ret['model']
else:
self.temp.setdefault(path, {})['owner'] = 'tmp'
self.temp[path]['value'] = value
self.set_dependencies(path, value)
self.set_not_equal(path, value)
self.do_copy(path, value)
def test_value(self,
path: str,
value: Any,
remote: bool):
if isinstance(value, list):
for val in value:
if not self.test_value(path, val, remote):
return False
return True
else:
if value is None:
match = True
else:
match = self.form[path]['pattern'].search(value)
if not remote:
if not match:
self.temp.setdefault(path, {})['error'] = ['']
elif 'error' in self.model[path]:
del self.temp[path]['error']
return match
def set_dependencies(self,
path: str,
value: Any,
force_hide: bool=False) -> None:
dependencies = self.form.get(path, {}).get('dependencies', {})
if dependencies:
if value in dependencies['expected']:
expected = dependencies['expected'][value]
else:
expected = dependencies['default']
for action in ['hide', 'show']:
expected_actions = expected.get(action)
if expected_actions:
if force_hide:
hidden = True
else:
hidden = action == 'hide'
for expected_path in expected_actions:
self.temp.setdefault(expected_path, {})['hidden'] = hidden
if 'value' in self.temp.get(expected_path, {}):
value = self.temp[expected_path]['value']
else:
value = self.model[expected_path].get('value')
self.set_dependencies(expected_path, value, hidden)
def set_not_equal(self,
path: str,
value: Any) -> None:
not_equal = self.form.get(path, {}).get('not_equal', {})
if not_equal:
vals = []
opts = []
if isinstance(value, list):
for val in value:
vals.append(val)
opts.append(path)
else:
vals.append(value)
opts.append(path)
for path_ in self.form[path]['not_equal']['options']:
schema = self.get_schema(path_)
if not schema.get('isMulti', False):
default_value = None
else:
default_value = []
p_value = self.model[path_].get('value', default_value)
if isinstance(p_value, list):
for val in p_value:
vals.append(val)
opts.append(path_)
else:
vals.append(p_value)
opts.append(path_)
equal = []
is_current = False
warnings_only = self.form[path]['not_equal'].get('warnings', False)
if warnings_only:
msg = _('should be different from the value of "{}"')
msgcurr = _('value for {} should be different')
else:
msg = _('must be different from the value of "{}"')
msgcurr = _('value for {} must be different')
for idx_inf, val_inf in enumerate(vals):
for idx_sup, val_sup in enumerate(vals[idx_inf + 1:]):
if val_inf == val_sup is not None:
for opt_ in [opts[idx_inf], opts[idx_inf + idx_sup + 1]]:
if opt_ == path:
is_current = True
if opt_ not in equal:
equal.append(opt_)
if equal:
equal_name = {}
for opt in equal:
schema = self.get_schema(opt)
equal_name[opt] = schema['title']
len_equal_name = len(equal)
for opt_ in equal:
display_equal = []
for opt__ in equal:
if opt_ != opt__:
display_equal.append(equal_name[opt_])
display_equal = ', '.join(display_equal)
if opt_ == path:
msg_ = msgcurr.format(display_equal)
else:
msg_ = msg.format(display_equal)
if warnings_only:
self.model[opt_].setdefault('warnings', []).append(msg_)
else:
self.model[opt_].setdefault('error', []).append(msg_)
else:
for opt in opts:
if 'warnings' in self.model[opt]:
del self.model[opt]['warnings']
if 'error' in self.model[opt]:
del self.model[opt]['error']
def do_copy(self,
path: str,
value: Any) -> None:
copy = self.form.get(path, {}).get('copy')
if copy:
for opt in copy:
owner = self.temp.get(opt, {}).get('owner')
if owner is None:
owner = self.model[opt]['owner']
if owner == 'default':
self.model[opt]['value'] = value
def send_data(self,
updates):
raise NotImplementedError('please implement send_data function')