tiramisu-api-python/tiramisu_json_api/api.py

938 lines
35 KiB
Python

from typing import Optional, Dict, List, Any
from copy import copy
import warnings
import re
from .error import APIError, ValueWarning, ValueOptionError, ValueErrorWarning, PropertiesOptionError
from .setting import undefined
from .i18n import _
TIRAMISU_JSON_VERSION = '1.0'
TYPE = {'boolean': bool,
'integer': int,
'string': str,
'password': str,
'domain': str}
class Option:
# fake Option (IntOption, StrOption, ...)
# only usefull for warnings
def __init__(self,
path):
self.path = path
def __call__(self):
# suppose to be a weakref
return self
def impl_getpath(self):
return self.path
class TiramisuOptionOption:
# config.option(path).option
def __init__(self,
config: 'Config',
path: str,
schema: Dict,
model: Dict,
form: Dict) -> None:
self.config = config
self._path = path
self.schema = schema
self.model = model
self.form = form
def doc(self):
return self.schema['title']
def path(self):
return self._path
def name(self,
follow_symlink: bool=False) -> str:
if not follow_symlink or \
self.isoptiondescription() or \
not self.issymlinkoption():
path = self._path
else:
path = self.schema['opt_path']
return path.rsplit('.', 1)[-1]
def isoptiondescription(self):
return self.schema['type'] in ['object', 'array']
def isleadership(self):
return self.schema['type'] == 'array'
def isleader(self):
if '.' in self._path:
parent_schema = self.config.get_schema(self._path.rsplit('.', 1)[0])
if parent_schema['type'] == 'array':
leader = next(iter(parent_schema['properties'].keys()))
return leader == self._path
return False
def isfollower(self):
return self.config.isfollower(self._path)
def issymlinkoption(self) -> bool:
return self.schema['type'] == 'symlink'
def ismulti(self) -> bool:
return self.schema.get('isMulti', False)
def type(self) -> str:
if self.isleadership():
return 'leadership'
if self.isoptiondescription():
return 'optiondescription'
return self.schema['type']
def properties(self) -> List[str]:
model = self.model.get(self._path, {})
return self.config.get_properties(self.model, self._path, None)
def requires(self) -> None:
# FIXME
return None
class TiramisuOptionProperty:
# config.option(path).property
def __init__(self,
config: 'Config',
path: str,
index: Optional[int],
model: Dict) -> None:
self.config = config
self.path = path
self.index = index
self.model = model
def get(self, only_raises=False):
if not only_raises:
props = self.config.get_properties(self.model, self.path, self.index, only_raises)
else:
props = []
if self.config.is_hidden(self.path,
self.index):
props.append('hidden')
return props
class _Value:
def _dict_walk(self,
ret: Dict,
schema: Dict,
root: str,
fullpath: bool,
withwarning: bool) -> None:
leadership_len = None
for key, option in schema['properties'].items():
if self.config.is_hidden(key, None) is False:
if option['type'] in ['object', 'array']:
# optiondescription or leadership
self._dict_walk(ret,
option,
root,
fullpath,
withwarning)
elif schema.get('type') == 'array' and leadership_len is not None:
# followers
values = []
for index in range(leadership_len):
value = self.config.get_value(key,
index)
self._display_warnings(key, value, option['type'], option['name'], withwarning)
values.append(value)
ret[key] = values
else:
value = self.config.get_value(key)
self._display_warnings(key, value, option['type'], option['name'], withwarning)
ret[key] = value
if schema.get('type') == 'array':
leadership_len = len(value)
elif schema.get('type') == 'array' and leadership_len is None:
# if leader is hidden, followers are hidden too
break
def dict(self,
fullpath: bool=False,
withwarning: bool=False):
ret = {}
self._dict_walk(ret,
self.schema,
self.path,
fullpath,
withwarning)
return ret
def _display_warnings(self, path, value, type, name, withwarning=True):
for err in self.model.get(path, {}).get('error', []):
warnings.warn_explicit(ValueErrorWarning(value,
type,
Option(path),
'{0}'.format(err)),
ValueErrorWarning,
self.__class__.__name__, 0)
if withwarning and self.model.get(path, {}).get('warnings'):
for warn in self.model.get(path, {}).get('warnings'):
warnings.warn_explicit(ValueErrorWarning(value,
type,
Option(path),
'{0}'.format(err)),
ValueErrorWarning,
self.__class__.__name__, 0)
class TiramisuOptionOwner:
# config.option(path).owner
def __init__(self,
config: 'Config',
schema: Dict,
model: List[Dict],
form: List[Dict],
temp: List[Dict],
path: str,
index: int) -> None:
self.config = config
self.schema = schema
self.model = model
self.form = form
self.temp = temp
self.path = path
self.index = index
def isdefault(self) -> Any:
return self.config.get_owner(self.path, self.index) == 'default'
def get(self) -> str:
return self.config.get_owner(self.path, self.index)
class TiramisuOptionValue(_Value):
# config.option(path).value
def __init__(self,
config: 'Config',
schema: Dict,
model: List[Dict],
form: List[Dict],
temp: List[Dict],
path: str,
index: int) -> None:
self.config = config
self.schema = schema
self.model = model
self.form = form
self.temp = temp
self.path = path
self.index = index
def get(self) -> Any:
if self.config.isfollower(self.path):
if self.index is None:
raise APIError(_('index must be set with the follower option "{}"').format(self.path))
value = self.config.get_value(self.path, self.index)
self._display_warnings(self.path, value, self.schema['type'], self.schema['name'])
return value
if self.index is not None:
raise APIError(_('index must only be set with a follower option, not for "{}"').format(self.path))
value = self.config.get_value(self.path)
self._display_warnings(self.path, value, self.schema['type'], self.schema['name'])
return value
def list(self):
return self.schema['enum']
def _validate(self, type_, value):
if value in [None, undefined]:
return
if type_ == 'choice':
if value not in self.schema['enum']:
raise Exception('value {} is not in {}'.format(value, self.schema['enum']))
elif not isinstance(value, TYPE[type_]):
raise Exception('value {} is not a valid {} '.format(value, type_))
def set(self, value):
type_ = self.schema['type']
remote = self.form.get(self.path, {}).get('remote', False)
if self.index is None and self.schema.get('isMulti', False):
if not isinstance(value, list):
raise Exception('value must be a list')
for val in value:
self._validate(type_, val)
else:
self._validate(type_, value)
self.config.modify_value(self.path,
self.index,
value,
remote)
self._display_warnings(self.path, value, type_, self.schema['name'])
def reset(self):
remote = self.form.get(self.path, {}).get('remote', False)
self.config.delete_value(self.path,
self.index,
remote)
def default(self):
return self.schema.get('value')
class _Option:
def list(self,
type='option'):
if type not in ['all', 'option', 'optiondescription']:
raise Exception('unknown list type {}'.format(type))
for path, schema in self.schema['properties'].items():
if not self.config.is_hidden(path, None):
if schema['type'] in ['object', 'array']:
if type in ['all', 'optiondescription']:
yield TiramisuOptionDescription(self.config,
schema,
self.model,
self.form,
self.temp,
path)
elif type in ['all', 'option']:
yield TiramisuOption(self.config,
schema,
self.model,
self.form,
self.temp,
path,
self.index)
class TiramisuOptionDescription(_Option):
# config.option(path) (with path == OptionDescription)
def __init__(self,
config: 'Config',
schema: Dict,
model: List[Dict],
form: List[Dict],
temp: List[Dict],
path: str) -> None:
self.config = config
self.schema = schema
self.model = model
self.form = form
self.temp = temp
self.path = path
self.index = None
def __getattr__(self,
subfunc: str) -> Any:
if subfunc == 'option':
return TiramisuOptionOption(self.config,
self.path,
self.schema,
self.model,
self.form)
if subfunc == 'property':
return TiramisuOptionProperty(self.config,
self.path,
self.model.get(self.path, {}))
if subfunc == 'value':
return TiramisuOptionValue(self.config,
self.schema,
self.model,
self.form,
self.temp,
self.path,
self.index)
raise APIError(_('please specify a valid sub function ({})').format(subfunc))
def group_type(self):
if self.config.is_hidden(self.path, None):
# FIXME
return 'default'
raise PropertiesOptionError(None, None, None, opt_type='optiondescription')
class TiramisuOption:
# config.option(path) (with path == Option)
def __init__(self,
config: 'Config',
schema: Dict,
model: List[Dict],
form: List[Dict],
temp: List[Dict],
path: str,
index: Optional[int]) -> None:
self.config = config
self.schema = schema
self.model = model
self.form = form
self.temp = temp
self.path = path
self.index = index
def __getattr__(self,
subfunc: str) -> Any:
if subfunc == 'option':
if self.index != None:
raise NotImplementedError()
return TiramisuOptionOption(self.config,
self.path,
self.schema,
self.model,
self.form)
if subfunc == 'value':
return TiramisuOptionValue(self.config,
self.schema,
self.model,
self.form,
self.temp,
self.path,
self.index)
if subfunc == 'owner':
return TiramisuOptionOwner(self.config,
self.schema,
self.model,
self.form,
self.temp,
self.path,
self.index)
if subfunc == 'property':
return TiramisuOptionProperty(self.config,
self.path,
self.index,
self.model.get(self.path, {}))
raise APIError(_('please specify a valid sub function ({})').format(subfunc))
class TiramisuContextProperty:
# config.property
# def __init__(self,
# json):
# self.json = json
def get(self):
# FIXME ?
return ['demoting_error_warning']
class ContextOption(_Option):
# config.option
def __init__(self,
config: 'Config',
model: Dict,
form: Dict,
schema: Dict,
temp: Dict) -> None:
self.config = config
self.model = model
self.form = form
self.schema = {'properties': schema}
self.temp = temp
self.index = None
def __call__(self,
path: str,
index: Optional[int]=None) -> TiramisuOption:
schema = self.config.get_schema(path)
if schema['type'] in ['object', 'array']:
return TiramisuOptionDescription(self.config,
schema,
self.model,
self.form,
self.temp,
path)
return TiramisuOption(self.config,
schema,
self.model,
self.form,
self.temp,
path,
index)
class ContextValue(_Value):
# config.value
def __init__(self,
config: 'Config',
model: Dict,
form: Dict,
schema: Dict,
temp: Dict) -> None:
self.config = config
self.model = model
self.form = form
first = next(iter(schema.keys()))
self.path = first.rsplit('.', 1)[0]
self.schema = {'properties': schema}
self.temp = temp
def __call__(self) -> TiramisuOptionValue:
return TiramisuOptionValue(self.config,
self.schema,
self.model,
self.form,
self.temp,
path,
index)
def mandatory(self):
for key, value in self.dict().items():
if self.model.get(key, {}).get('required') and \
value is None or \
(self.schema.get('isMulti') and (None in value or '' in value)):
yield key
class Config:
# config
def __init__(self,
json):
if json.get('version') != TIRAMISU_JSON_VERSION:
raise Exception('incompatible tiramisu-json format version (got {}, expected {})'.format(json.get('version', '0.0'), TIRAMISU_JSON_VERSION))
self.model = json['model']
self.form = json['form']
self.form = {}
# support pattern
for key, option in json['form'].items():
if key != 'null':
if 'pattern' in option:
option['pattern'] = re.compile(option['pattern'])
self.temp = {}
self.schema = json['schema']
self.updates = []
first_path = next(iter(self.schema.keys()))
if '.' in first_path:
self.root = first_path.rsplit('.', 1)[0]
else:
self.root = ''
def __getattr__(self,
subfunc: str) -> Any:
if subfunc == 'property':
return TiramisuContextProperty()
if subfunc == 'option':
return ContextOption(self,
self.model,
self.form,
self.schema,
self.temp)
if subfunc == 'value':
return ContextValue(self,
self.model,
self.form,
self.schema,
self.temp)
raise APIError(_('please specify a valid sub function ({})').format(subfunc))
def add_value(self,
path: str,
index: Optional[int],
value: Any,
remote: bool) -> None:
self.updates_value('add',
path,
index,
value,
remote,
None)
def modify_value(self,
path: str,
index: Optional[int],
value: Any,
remote: bool) -> None:
schema = self.get_schema(path)
if value and isinstance(value, list) and value[-1] is undefined:
new_value = schema.get('defaultvalue')
if new_value is None:
len_value = len(value)
schema_value = schema.get('value', [])
if len(schema_value) >= len_value:
new_value = schema_value[len_value - 1]
value[-1] = new_value
self.updates_value('modify',
path,
index,
value,
remote,
None)
def delete_value(self,
path: str,
index: Optional[int],
remote: bool) -> None:
self.updates_value('delete',
path,
index,
None,
remote,
None)
def get_properties(self,
model,
path,
index,
only_raises=True):
props = model.get('properties', [])[:]
if model.get('required'):
if self.get_schema(path).get('isMulti', False):
props.append('empty')
else:
props.append('mandatory')
if model.get('needs_len'):
props.append('mandatory')
if model.get('readOnly'):
props.append('frozen')
if only_raises and self.is_hidden(path,
index):
props.append('hidden')
if self.form.get(path, {}).get('clearable'):
props.append('clearable')
return props
def get_schema(self,
path):
root_path = self.root
schema = {'properties': self.schema,
'type': 'object'}
if root_path:
root = self.root.split('.')
if not path.startswith(self.root):
raise Exception('cannot find {0}'.format(path))
subpaths = path.split('.')[len(root):]
else:
subpaths = path.split('.')
for subpath in subpaths:
if root_path:
root_path += '.' + subpath
else:
root_path = subpath
schema = schema['properties'][root_path]
return schema
def isfollower(self,
path: str) -> bool:
if '.' in path:
parent_schema = self.get_schema(path.rsplit('.', 1)[0])
leader = next(iter(parent_schema['properties'].keys()))
if parent_schema['type'] == 'array' and \
leader != path:
return True
return False
def is_hidden(self,
path: str,
index: Optional[int]) -> bool:
for property_, needs in {'hidden': True, 'display': False}.items():
if property_ in self.temp.get(path, {}):
value = self.temp[path][property_]
else:
if self.isfollower(path):
if self.model.get(path, {}).get('null', {}).get(property_, None) == needs:
return True
elif self.model.get(path, {}).get(property_, None) == needs:
return True
index = str(index)
if index != 'None' and index in self.model.get(path, {}) and self.model.get(path, {}).get(index, {}).get(property_, None) == needs:
return True
return False
def get_value(self,
path: str,
index: int=None) -> Any:
if index is None:
if 'value' in self.temp.get(path, {}):
value = self.temp[path]['value']
else:
value = self.model.get(path, {}).get('value')
if value is None and self.get_schema(path).get('isMulti', False):
value = []
else:
index = str(index)
if 'delete' in self.temp.get(path, {}):
value = None
elif index in self.temp.get(path, {}):
if 'delete' in self.temp[path][index]:
value = None
else:
value = self.temp[path]
else:
value = self.model.get(path)
if self.isfollower(path):
if self.is_hidden(path, index):
value = PropertiesOptionError(None, None, None, opt_type='option')
elif value is not None and index in value:
value = value[index]['value']
else:
value = self.get_schema(path).get('default')
else:
if value is not None and index in value and 'value' in value[index]:
value = value[index]['value']
else:
value = self.get_schema(path).get('default')
return value
def get_owner(self,
path: str,
index: int) -> str:
if not self.isfollower(path):
if 'owner' in self.temp.get(path, {}):
owner = self.temp[path]['owner']
else:
owner = self.model.get(path, {}).get('owner', 'default')
else:
if 'value' in self.temp.get(path, {}):
value = self.temp[path]
else:
value = self.model.get(path, {})
index = str(index)
if self.is_hidden(path, index):
raise PropertiesOptionError(None, None, None, opt_type='option')
if index in value:
owner = value[index]['owner']
else:
owner = 'default'
return owner
def updates_value(self,
action: str,
path: str,
index: Optional[int],
value: Optional[Any],
remote: bool,
leadership: Optional[str]) -> None:
update_last_action = False
if self.updates:
last_body = self.updates[-1]
if last_body['name'] == path:
if index is None and not 'index' in last_body:
last_action = last_body['action']
if last_action == action or \
last_action in ['delete', 'modify'] and action in ['delete', 'modify']:
update_last_action = True
elif index == None and action == 'delete':
for update in reversed(self.updates):
if leadership is None and update['name'] == path or \
leadership and path.startswith(leadership + '.'):
del self.updates[-1]
else:
break
elif last_body['index'] == index:
if last_body['action'] == 'add' and action == 'modify':
action = 'add'
update_last_action = True
elif last_body['action'] == action and action != 'delete' or \
last_body['action'] == 'modify' and action == 'delete':
update_last_action = True
elif last_body['action'] == 'add' and action == 'delete':
del self.updates[-1]
if update_last_action:
if action == 'delete' and value is None:
if 'value' in last_body:
del last_body['value']
else:
last_body['value'] = value
if index is None and 'index' in last_body:
del last_body['index']
last_body['action'] = action
else:
data = {'action': action,
'name': path}
if action != 'delete' and value is not None:
data['value'] = value
if index is not None:
data['index'] = index
self.updates.append(data)
if 'pattern' in self.form.get(path, {}) and (not isinstance(value, list) or undefined not in value):
match = self.test_value(path, value, remote)
else:
match = True
if match:
if remote:
self.updates_data(self.send_data({'updates': self.updates,
'model': self.model}))
else:
if action == 'delete':
if index is None:
# leader or standard option
# set value to default value
value = self.default_value(path)
self.temp[path] = {'value': value, 'owner': 'default'}
if self.option(path).option.isleader():
# if leader, set follower to default value
leadership_path = path.rsplit('.', 1)[0]
parent_schema = self.get_schema(leadership_path)
iter_leadership = list(parent_schema['properties'].keys())
for follower in iter_leadership[1:]:
# delete all values
self.temp[follower] = {'delete': True}
elif self.option(path).option.isleader():
# if remove an indexed leader value
old_value = self.option(path).value.get()
old_value.pop(index)
self.temp[path] = {'value': old_value, 'owner': 'tmp'}
leadership_path = path.rsplit('.', 1)[0]
parent_schema = self.get_schema(leadership_path)
iter_leadership = list(parent_schema['properties'].keys())
for follower in iter_leadership[1:]:
# remove value for this index and reduce len
#FIXME on ne reduce pas la longueur !!!!
self.temp.setdefault(follower, {})[str(index)] = {'delete': True}
else:
# it's a follower with index
self.temp.setdefault(path, {})[str(index)] = {'delete': True}
elif index is None:
# set a value for a not follower option
self.temp[path] = {'value': value, 'owner': 'tmp'}
else:
# set a value for a follower option
self.temp.setdefault(path, {})[str(index)] = {'value': value, 'owner': 'tmp'}
self.set_dependencies(path, value)
self.set_not_equal(path, value)
self.do_copy(path, value)
def default_value(self, path):
schema = self.get_schema(path)
value = schema.get('value');
if value is None and schema.get('isMulti', False):
value = []
return value
def updates_data(self, data):
self.updates = []
self.temp.clear()
self.model = data['model']
def test_value(self,
path: str,
value: Any,
remote: bool):
if isinstance(value, list):
for val in value:
if not self.test_value(path, val, remote):
return False
return True
else:
if value is None:
match = True
else:
match = self.form[path]['pattern'].search(value)
if not remote:
if not match:
self.temp.setdefault(path, {})['error'] = ['']
elif 'error' in self.model[path]:
del self.temp[path]['error']
return match
def set_dependencies(self,
path: str,
ori_value: Any,
force_hide: bool=False) -> None:
dependencies = self.form.get(path, {}).get('dependencies', {})
if dependencies:
if ori_value in dependencies['expected']:
expected = dependencies['expected'][ori_value]
else:
expected = dependencies['default']
for action in ['hide', 'show']:
expected_actions = expected.get(action)
if expected_actions:
if force_hide:
hidden = True
else:
hidden = action == 'hide'
for expected_path in expected_actions:
self.temp.setdefault(expected_path, {})['hidden'] = hidden
if 'value' in self.temp[expected_path]:
value = self.temp[expected_path]['value']
else:
value = self.model[expected_path].get('value')
self.set_dependencies(expected_path, value, hidden)
def set_not_equal(self,
path: str,
value: Any) -> None:
not_equal = self.form.get(path, {}).get('not_equal', {})
if not_equal:
vals = []
opts = []
if isinstance(value, list):
for val in value:
vals.append(val)
opts.append(path)
else:
vals.append(value)
opts.append(path)
for path_ in self.form[path]['not_equal']['options']:
schema = self.get_schema(path_)
p_value = self.get_value(path_)
if isinstance(p_value, list):
for val in p_value:
vals.append(val)
opts.append(path_)
else:
vals.append(p_value)
opts.append(path_)
equal = []
warnings_only = self.form[path]['not_equal'].get('warnings', False)
if warnings_only:
msg = _('should be different from the value of "{}"')
msgcurr = _('value for {} should be different')
else:
msg = _('must be different from the value of "{}"')
msgcurr = _('value for {} must be different')
for idx_inf, val_inf in enumerate(vals):
for idx_sup, val_sup in enumerate(vals[idx_inf + 1:]):
if val_inf == val_sup is not None:
for opt_ in [opts[idx_inf], opts[idx_inf + idx_sup + 1]]:
if opt_ not in equal:
equal.append(opt_)
if equal:
equal_name = {}
for opt in equal:
schema = self.get_schema(opt)
equal_name[opt] = schema['title']
for opt_ in equal:
display_equal = []
for opt__ in equal:
if opt_ != opt__:
display_equal.append(equal_name[opt_])
display_equal = ', '.join(display_equal)
if opt_ == path:
msg_ = msgcurr.format(display_equal)
else:
msg_ = msg.format(display_equal)
if warnings_only:
self.model[opt_].setdefault('warnings', []).append(msg_)
else:
self.model[opt_].setdefault('error', []).append(msg_)
else:
for opt in opts:
if 'warnings' in self.model[opt]:
del self.model[opt]['warnings']
if 'error' in self.model[opt]:
del self.model[opt]['error']
def do_copy(self,
path: str,
value: Any) -> None:
copy = self.form.get(path, {}).get('copy')
if copy:
for opt in copy:
# FIXME follower!
owner = self.get_owner(opt, None)
if owner == 'default':
# do not change in this.temp, it's default value
self.model[opt]['value'] = value
def send_data(self,
updates):
raise NotImplementedError('please implement send_data function')