tiramisu_json_api => tiramisu_api
This commit is contained in:
5
tiramisu_api/__init__.py
Normal file
5
tiramisu_api/__init__.py
Normal file
@ -0,0 +1,5 @@
|
||||
from .api import Config
|
||||
|
||||
__version__ = "0.1"
|
||||
__all__ = ('Config',)
|
||||
|
1392
tiramisu_api/api.py
Normal file
1392
tiramisu_api/api.py
Normal file
@ -0,0 +1,1392 @@
|
||||
from typing import Optional, Dict, List, Any
|
||||
from copy import copy
|
||||
import warnings
|
||||
import re
|
||||
|
||||
|
||||
from .error import APIError, ConfigError, ValueWarning, ValueOptionError, ValueErrorWarning, PropertiesOptionError, display_list
|
||||
from .setting import undefined
|
||||
from .i18n import _
|
||||
|
||||
|
||||
TIRAMISU_FORMAT = '1.0'
|
||||
DEBUG = False
|
||||
|
||||
|
||||
TYPE = {'boolean': bool,
|
||||
'integer': int,
|
||||
'string': str,
|
||||
'password': str,
|
||||
'filename': str,
|
||||
'email': str,
|
||||
'url': str,
|
||||
'ip': str,
|
||||
'network': str,
|
||||
'netmask': str,
|
||||
'broadcast_address': str,
|
||||
'port': str,
|
||||
'domainname': str,
|
||||
'date': str}
|
||||
|
||||
|
||||
class Option:
|
||||
# fake Option (IntOption, StrOption, ...)
|
||||
# only usefull for warnings
|
||||
def __init__(self,
|
||||
path,
|
||||
display_name):
|
||||
self.path = path
|
||||
self.display_name = display_name
|
||||
|
||||
def __call__(self):
|
||||
# suppose to be a weakref
|
||||
return self
|
||||
|
||||
def impl_getpath(self):
|
||||
return self.path
|
||||
|
||||
def impl_get_display_name(self):
|
||||
return self.display_name
|
||||
|
||||
|
||||
class TiramisuOptionOption:
|
||||
# config.option(path).option
|
||||
|
||||
def __call__(self,
|
||||
path: str,
|
||||
index: Optional[int]=None) -> 'TiramisuOption':
|
||||
# config.option(path).option(path)
|
||||
path = self._path + '.' + path
|
||||
schema = self.config.get_schema(path)
|
||||
if schema['type'] in ['object', 'array']:
|
||||
return TiramisuOptionDescription(self.config,
|
||||
schema,
|
||||
path)
|
||||
return TiramisuOption(self.config,
|
||||
schema,
|
||||
path,
|
||||
index)
|
||||
def __init__(self,
|
||||
config: 'Config',
|
||||
path: str,
|
||||
schema: Dict) -> None:
|
||||
self.config = config
|
||||
self._path = path
|
||||
self.schema = schema
|
||||
|
||||
def doc(self):
|
||||
if self.issymlinkoption():
|
||||
schema = self.config.get_schema(self.schema['opt_path'])
|
||||
else:
|
||||
schema = self.schema
|
||||
return schema['title']
|
||||
|
||||
def path(self):
|
||||
return self._path
|
||||
|
||||
def name(self,
|
||||
follow_symlink: bool=False) -> str:
|
||||
if not follow_symlink or \
|
||||
self.isoptiondescription() or \
|
||||
not self.issymlinkoption():
|
||||
path = self._path
|
||||
else:
|
||||
path = self.schema['opt_path']
|
||||
return path.rsplit('.', 1)[-1]
|
||||
|
||||
def isoptiondescription(self):
|
||||
return self.schema['type'] in ['object', 'array']
|
||||
|
||||
def isleadership(self):
|
||||
return self.schema['type'] == 'array'
|
||||
|
||||
def isleader(self):
|
||||
return self.config.isleader(self._path)
|
||||
|
||||
def isfollower(self):
|
||||
return self.config.isfollower(self._path)
|
||||
|
||||
def issymlinkoption(self) -> bool:
|
||||
return self.schema['type'] == 'symlink'
|
||||
|
||||
def ismulti(self) -> bool:
|
||||
return self.schema.get('isMulti', False)
|
||||
|
||||
def issubmulti(self) -> bool:
|
||||
return self.schema.get('isSubMulti', False)
|
||||
|
||||
def type(self) -> str:
|
||||
if self.isleadership():
|
||||
return 'leadership'
|
||||
if self.isoptiondescription():
|
||||
return 'optiondescription'
|
||||
if self.issymlinkoption():
|
||||
return self.config.get_schema(self.schema['opt_path'])['type']
|
||||
return self.schema['type']
|
||||
|
||||
def properties(self) -> List[str]:
|
||||
model = self.config.model.get(self._path, {})
|
||||
if self.isfollower():
|
||||
model = model.get('null', {})
|
||||
return self.config.get_properties(model, self._path, None)
|
||||
#
|
||||
# def requires(self) -> None:
|
||||
# # FIXME
|
||||
# return None
|
||||
|
||||
def pattern(self):
|
||||
if self._path in self.config.form:
|
||||
return self.config.form[self._path].get('pattern')
|
||||
else:
|
||||
return None
|
||||
|
||||
def defaultmulti(self):
|
||||
if not self.schema.get('isMulti'):
|
||||
raise Exception('defaultmulti avalaible only for a multi')
|
||||
return self.schema.get('defaultmulti')
|
||||
|
||||
|
||||
class TiramisuOptionProperty:
|
||||
# config.option(path).property
|
||||
def __init__(self,
|
||||
config: 'Config',
|
||||
path: str,
|
||||
index: Optional[int]) -> None:
|
||||
self.config = config
|
||||
self.path = path
|
||||
self.index = index
|
||||
|
||||
def get(self, only_raises=False):
|
||||
if not only_raises:
|
||||
model = self.config.model.get(self.path, {})
|
||||
props = self.config.get_properties(model, self.path, self.index, only_raises)
|
||||
else:
|
||||
props = []
|
||||
if self.config.is_hidden(self.path,
|
||||
self.index):
|
||||
props.append('hidden')
|
||||
return props
|
||||
|
||||
|
||||
class _Value:
|
||||
def _dict_walk(self,
|
||||
ret: Dict,
|
||||
schema: Dict,
|
||||
root: str,
|
||||
fullpath: bool,
|
||||
withwarning: bool,
|
||||
flatten: bool,
|
||||
len_parent: Optional[int]) -> None:
|
||||
leadership_len = None
|
||||
for key, option in schema['properties'].items():
|
||||
if self.config.is_hidden(key, None) is False:
|
||||
if flatten:
|
||||
nkey = key.split('.')[-1]
|
||||
elif not fullpath and len_parent is not None:
|
||||
nkey = key[len_parent:]
|
||||
else:
|
||||
nkey = key
|
||||
if option['type'] in ['object', 'array']:
|
||||
# optiondescription or leadership
|
||||
self._dict_walk(ret,
|
||||
option,
|
||||
root,
|
||||
fullpath,
|
||||
withwarning,
|
||||
flatten,
|
||||
len_parent)
|
||||
elif schema.get('type') == 'array' and leadership_len is not None:
|
||||
# followers
|
||||
values = []
|
||||
for index in range(leadership_len):
|
||||
value = self.config.get_value(key,
|
||||
index)
|
||||
self.config._check_raises_warnings(key, index, value, option['type'], withwarning)
|
||||
values.append(value)
|
||||
ret[nkey] = values
|
||||
else:
|
||||
value = self.config.get_value(key)
|
||||
self.config._check_raises_warnings(key, None, value, option['type'], withwarning)
|
||||
ret[nkey] = value
|
||||
if schema.get('type') == 'array':
|
||||
leadership_len = len(value)
|
||||
elif schema.get('type') == 'array' and leadership_len is None:
|
||||
# if leader is hidden, followers are hidden too
|
||||
break
|
||||
|
||||
def dict(self,
|
||||
fullpath: bool=False,
|
||||
withwarning: bool=False,
|
||||
flatten: bool=False):
|
||||
ret = {}
|
||||
self._dict_walk(ret,
|
||||
self.schema,
|
||||
self.path,
|
||||
fullpath,
|
||||
withwarning,
|
||||
flatten,
|
||||
None)
|
||||
return ret
|
||||
|
||||
|
||||
class TiramisuOptionOwner:
|
||||
# config.option(path).owner
|
||||
def __init__(self,
|
||||
config: 'Config',
|
||||
schema: Dict,
|
||||
path: str,
|
||||
index: int) -> None:
|
||||
self.config = config
|
||||
self.schema = schema
|
||||
self.path = path
|
||||
self.index = index
|
||||
|
||||
def isdefault(self) -> Any:
|
||||
return self.config.get_owner(self.path, self.index) == 'default'
|
||||
|
||||
def get(self) -> str:
|
||||
return self.config.get_owner(self.path, self.index)
|
||||
|
||||
|
||||
class TiramisuOptionDescriptionValue(_Value):
|
||||
# config.option(descr_path).value
|
||||
def __init__(self,
|
||||
config: 'Config',
|
||||
schema: Dict,
|
||||
path: str,
|
||||
index: int) -> None:
|
||||
self.config = config
|
||||
self.schema = schema
|
||||
self.path = path
|
||||
self.index = index
|
||||
|
||||
def dict(self,
|
||||
fullpath: bool=False,
|
||||
withwarning: bool=False,
|
||||
flatten: bool=False):
|
||||
ret = {}
|
||||
len_parent = len(self.path) + 1
|
||||
self._dict_walk(ret,
|
||||
self.schema,
|
||||
self.path,
|
||||
fullpath,
|
||||
withwarning,
|
||||
flatten,
|
||||
len_parent)
|
||||
return ret
|
||||
|
||||
|
||||
class TiramisuOptionValue(_Value):
|
||||
# config.option(path).value
|
||||
def __init__(self,
|
||||
config: 'Config',
|
||||
schema: Dict,
|
||||
path: str,
|
||||
index: int) -> None:
|
||||
self.config = config
|
||||
self.schema = schema
|
||||
self.path = path
|
||||
self.index = index
|
||||
|
||||
def get(self) -> Any:
|
||||
if self.schema['type'] == 'symlink':
|
||||
# FIXME should tested it too
|
||||
pass
|
||||
elif self.config.isfollower(self.path):
|
||||
if self.index is None:
|
||||
raise APIError(_('index must be set with the follower option "{}"').format(self.path))
|
||||
elif self.index is not None:
|
||||
raise APIError(_('index must only be set with a follower option, not for "{}"').format(self.path))
|
||||
if self.config.is_hidden(self.path, self.index):
|
||||
raise PropertiesOptionError(None, {'disabled'}, None, opt_type='option')
|
||||
value = self.config.get_value(self.path, self.index)
|
||||
self.config._check_raises_warnings(self.path, self.index, value, self.schema['type'])
|
||||
return value
|
||||
|
||||
def list(self):
|
||||
return self.schema['enum']
|
||||
|
||||
def _validate(self, type_, value):
|
||||
if value in [None, undefined]:
|
||||
return
|
||||
if type_ == 'symlink':
|
||||
raise ConfigError(_("can't assign to a SymLinkOption"))
|
||||
if type_ == 'choice':
|
||||
if 'enum' in self.schema and value not in self.schema['enum']:
|
||||
raise ValueError('value {} is not in {}'.format(value, self.schema['enum']))
|
||||
elif not isinstance(value, TYPE[type_]):
|
||||
raise ValueError('value {} is not a valid {} '.format(value, type_))
|
||||
|
||||
def set(self, value):
|
||||
type_ = self.schema['type']
|
||||
leader_old_value = undefined
|
||||
if self.config.is_hidden(self.path, self.index):
|
||||
raise PropertiesOptionError(None, {'disabled'}, None, opt_type='option')
|
||||
if self.config.isleader(self.path):
|
||||
leader_old_value = self.config.get_value(self.path)
|
||||
remote = self.config.form.get(self.path, {}).get('remote', False)
|
||||
if self.index is None and self.schema.get('isMulti', False):
|
||||
if not isinstance(value, list):
|
||||
raise ValueError('value must be a list')
|
||||
for val in value:
|
||||
if self.schema.get('isSubMulti', False):
|
||||
for v in val:
|
||||
self._validate(type_, v)
|
||||
else:
|
||||
self._validate(type_, val)
|
||||
else:
|
||||
if self.schema.get('isSubMulti', False):
|
||||
for val in value:
|
||||
self._validate(type_, val)
|
||||
else:
|
||||
self._validate(type_, value)
|
||||
if self.path in self.config.temp:
|
||||
obj = None
|
||||
if self.index is None:
|
||||
obj = self.config.temp[self.path]
|
||||
elif str(self.index) in self.config.temp[self.path]:
|
||||
obj = self.config.temp[self.path][str(self.index)]
|
||||
if obj is not None:
|
||||
for attr in ['error', 'warnings', 'invalid', 'hasWarnings']:
|
||||
if attr in obj:
|
||||
del obj[attr]
|
||||
self.config.modify_value(self.path,
|
||||
self.index,
|
||||
value,
|
||||
remote,
|
||||
leader_old_value)
|
||||
self.config._check_raises_warnings(self.path, self.index, value, type_)
|
||||
|
||||
def reset(self):
|
||||
self.config.delete_value(self.path,
|
||||
self.index)
|
||||
|
||||
def default(self):
|
||||
if self.schema.get('isMulti'):
|
||||
if self.config.isfollower(self.path):
|
||||
if 'defaultmulti' in self.schema:
|
||||
defaultmulti = self.schema['defaultmulti']
|
||||
else:
|
||||
defaultmulti = None
|
||||
if self.index is not None:
|
||||
value = defaultmulti
|
||||
else:
|
||||
leader = next(iter(self.config.option(self.path.rsplit('.', 1)[0]).schema['properties']))
|
||||
len_value = len(self.config.get_value(leader))
|
||||
value = [defaultmulti] * len_value
|
||||
else:
|
||||
value = self.schema.get('value', [])
|
||||
else:
|
||||
value = self.schema.get('value')
|
||||
return value
|
||||
|
||||
def valid(self):
|
||||
temp = self.config.temp.get(self.path, {})
|
||||
model = self.config.model.get(self.path, {})
|
||||
if self.index is None:
|
||||
if 'invalid' in temp:
|
||||
return not temp['invalid']
|
||||
return not model.get('invalid', False)
|
||||
elif str(self.index) in temp and 'invalid' in temp[str(self.index)]:
|
||||
return not temp[str(self.index)]['invalid']
|
||||
elif str(self.index) in model:
|
||||
return not model[str(self.index)].get('invalid', False)
|
||||
return True
|
||||
|
||||
def warning(self):
|
||||
temp = self.config.temp.get(self.path, {})
|
||||
model = self.config.model.get(self.path, {})
|
||||
if self.index is None:
|
||||
if 'hasWarnings' in temp:
|
||||
return temp['hasWarnings']
|
||||
return model.get('hasWarnings', False)
|
||||
elif str(self.index) in temp and 'hasWarnings' in temp[str(self.index)]:
|
||||
return temp[str(self.index)]['hasWarnings']
|
||||
elif str(self.index) in model:
|
||||
return model[str(self.index)].get('hasWarnings', False)
|
||||
return False
|
||||
|
||||
def error_message(self):
|
||||
temp = self.config.temp.get(self.path, {})
|
||||
model = self.config.model.get(self.path, {})
|
||||
if self.valid():
|
||||
return []
|
||||
if self.index is None:
|
||||
if temp.get('invalid') == True:
|
||||
return temp['error']
|
||||
return model['error']
|
||||
elif str(self.index) in temp and 'invalid' in temp[str(self.index)]:
|
||||
return temp[str(self.index)]['error']
|
||||
else:
|
||||
return model[str(self.index)]['error']
|
||||
return []
|
||||
|
||||
def warning_message(self):
|
||||
temp = self.config.temp.get(self.path, {})
|
||||
model = self.config.model.get(self.path, {})
|
||||
if not self.warning():
|
||||
return []
|
||||
if self.index is None:
|
||||
if temp.get('hasWarnings') == True:
|
||||
return temp['warnings']
|
||||
return model['warnings']
|
||||
elif str(self.index) in temp and 'hasWarnings' in temp[str(self.index)]:
|
||||
return temp[str(self.index)]['warnings']
|
||||
else:
|
||||
return model[str(self.index)]['warnings']
|
||||
return []
|
||||
|
||||
|
||||
class _Option:
|
||||
def list(self,
|
||||
type='option',
|
||||
recursive=False):
|
||||
if type not in ['all', 'option', 'optiondescription']:
|
||||
raise Exception('unknown list type {}'.format(type))
|
||||
for idx_path, path in enumerate(self.schema['properties']):
|
||||
subschema = self.schema['properties'][path]
|
||||
if not self.config.is_hidden(path, None):
|
||||
if subschema['type'] in ['object', 'array']:
|
||||
if type in ['all', 'optiondescription']:
|
||||
yield TiramisuOptionDescription(self.config,
|
||||
subschema,
|
||||
path)
|
||||
if recursive:
|
||||
yield from TiramisuOptionDescription(self.config,
|
||||
subschema,
|
||||
path).list(type, recursive)
|
||||
elif type in ['all', 'option']:
|
||||
yield TiramisuOption(self.config,
|
||||
subschema,
|
||||
path,
|
||||
self.index)
|
||||
elif self.schema.get('type') == 'array' and idx_path == 0:
|
||||
# if a leader is hidden, follower are hidden too
|
||||
break
|
||||
|
||||
|
||||
|
||||
class TiramisuOptionDescription(_Option):
|
||||
# config.option(path) (with path == OptionDescription)
|
||||
def __init__(self,
|
||||
config: 'Config',
|
||||
schema: Dict,
|
||||
path: str) -> None:
|
||||
self.config = config
|
||||
self.schema = schema
|
||||
self.path = path
|
||||
self.index = None
|
||||
|
||||
def __getattr__(self,
|
||||
subfunc: str) -> Any:
|
||||
if subfunc == 'option':
|
||||
return TiramisuOptionOption(self.config,
|
||||
self.path,
|
||||
self.schema)
|
||||
if subfunc == 'property':
|
||||
return TiramisuOptionProperty(self.config,
|
||||
self.path,
|
||||
None)
|
||||
if subfunc == 'value':
|
||||
return TiramisuOptionDescriptionValue(self.config,
|
||||
self.schema,
|
||||
self.path,
|
||||
self.index)
|
||||
raise APIError(_('please specify a valid sub function ({})').format(subfunc))
|
||||
|
||||
def group_type(self):
|
||||
if not self.config.is_hidden(self.path, None):
|
||||
# FIXME
|
||||
return 'default'
|
||||
raise PropertiesOptionError(None, {'disabled'}, None, opt_type='optiondescription')
|
||||
|
||||
|
||||
class TiramisuLeadershipValue(TiramisuOptionValue):
|
||||
def len(self):
|
||||
return len(self.config.get_value(self.path))
|
||||
|
||||
def pop(self,
|
||||
index: int) -> None:
|
||||
self.config.delete_value(self.path, index)
|
||||
|
||||
|
||||
class TiramisuOption:
|
||||
# config.option(path) (with path == Option)
|
||||
def __init__(self,
|
||||
config: 'Config',
|
||||
schema: Dict,
|
||||
path: str,
|
||||
index: Optional[int]) -> None:
|
||||
self.config = config
|
||||
self.schema = schema
|
||||
self.path = path
|
||||
self.index = index
|
||||
|
||||
def __getattr__(self,
|
||||
subfunc: str) -> Any:
|
||||
if subfunc == 'option':
|
||||
if self.index != None:
|
||||
raise NotImplementedError()
|
||||
return TiramisuOptionOption(self.config,
|
||||
self.path,
|
||||
self.schema)
|
||||
if subfunc == 'value':
|
||||
if self.config.isleader(self.path):
|
||||
obj = TiramisuLeadershipValue
|
||||
else:
|
||||
obj = TiramisuOptionValue
|
||||
return obj(self.config,
|
||||
self.schema,
|
||||
self.path,
|
||||
self.index)
|
||||
if subfunc == 'owner':
|
||||
return TiramisuOptionOwner(self.config,
|
||||
self.schema,
|
||||
self.path,
|
||||
self.index)
|
||||
if subfunc == 'property':
|
||||
return TiramisuOptionProperty(self.config,
|
||||
self.path,
|
||||
self.index)
|
||||
raise APIError(_('please specify a valid sub function ({})').format(subfunc))
|
||||
|
||||
|
||||
class TiramisuContextProperty:
|
||||
# config.property
|
||||
def __init__(self,
|
||||
config):
|
||||
self.config = config
|
||||
|
||||
def get(self):
|
||||
return self.config.global_model.get('properties', [])
|
||||
|
||||
|
||||
class ContextOption(_Option):
|
||||
# config.option
|
||||
def __init__(self,
|
||||
config: 'Config',
|
||||
schema: Dict) -> None:
|
||||
self.config = config
|
||||
self.schema = {'properties': schema}
|
||||
self.index = None
|
||||
|
||||
def __call__(self,
|
||||
path: str,
|
||||
index: Optional[int]=None) -> TiramisuOption:
|
||||
schema = self.config.get_schema(path)
|
||||
if schema['type'] in ['object', 'array']:
|
||||
return TiramisuOptionDescription(self.config,
|
||||
schema,
|
||||
path)
|
||||
return TiramisuOption(self.config,
|
||||
schema,
|
||||
path,
|
||||
index)
|
||||
|
||||
|
||||
class ContextOwner:
|
||||
# config.owner
|
||||
def __init__(self,
|
||||
config: 'Config',
|
||||
schema: Dict) -> None:
|
||||
self.config = config
|
||||
self.schema = {'properties': schema}
|
||||
self.index = None
|
||||
|
||||
def get(self):
|
||||
return self.config.global_model.get('owner', 'tmp')
|
||||
|
||||
|
||||
class ContextValue(_Value):
|
||||
# config.value
|
||||
def __init__(self,
|
||||
config: 'Config',
|
||||
schema: Dict) -> None:
|
||||
self.config = config
|
||||
first = next(iter(schema.keys()))
|
||||
self.path = first.rsplit('.', 1)[0]
|
||||
self.schema = {'properties': schema}
|
||||
|
||||
def __call__(self) -> TiramisuOptionValue:
|
||||
return TiramisuOptionValue(self.config,
|
||||
self.schema,
|
||||
path,
|
||||
index)
|
||||
|
||||
def mandatory(self):
|
||||
for key, value in self.dict().items():
|
||||
if self.config.isfollower(key):
|
||||
if self.config.model.get(key, {}).get('null', {}).get('required'):
|
||||
# FIXME test with index
|
||||
if self.config.get_schema(key).get('isSubMulti'):
|
||||
for val in value:
|
||||
if not val or None in val or '' in val:
|
||||
yield key
|
||||
break
|
||||
elif None in value or '' in value:
|
||||
yield key
|
||||
elif self.config.get_schema(key).get('isMulti'):
|
||||
if self.config.model.get(key, {}).get('required') and (None in value or '' in value):
|
||||
yield key
|
||||
if self.config.model.get(key, {}).get('needs_len') and not value:
|
||||
yield key
|
||||
elif self.config.model.get(key, {}).get('required') and value is None:
|
||||
yield key
|
||||
|
||||
|
||||
class Config:
|
||||
# config
|
||||
def __init__(self,
|
||||
dico):
|
||||
if DEBUG:
|
||||
from pprint import pprint
|
||||
pprint(dico)
|
||||
if dico.get('version') != TIRAMISU_FORMAT:
|
||||
raise Exception('incompatible version of tiramisu (got {}, expected {})'.format(dico.get('version', '0.0'), TIRAMISU_FORMAT))
|
||||
self.model = dico.get('model')
|
||||
self.global_model = dico.get('global')
|
||||
self.form = dico.get('form')
|
||||
# support pattern
|
||||
if self.form:
|
||||
for key, option in self.form.items():
|
||||
if key != 'null' and 'pattern' in option:
|
||||
option['pattern'] = re.compile(option['pattern'])
|
||||
self.temp = {}
|
||||
self.schema = dico.get('schema')
|
||||
self.updates = []
|
||||
if self.schema:
|
||||
first_path = next(iter(self.schema.keys()))
|
||||
else:
|
||||
first_path = ''
|
||||
if '.' in first_path:
|
||||
self.root = first_path.rsplit('.', 1)[0]
|
||||
else:
|
||||
self.root = ''
|
||||
|
||||
def __getattr__(self,
|
||||
subfunc: str) -> Any:
|
||||
if subfunc == 'property':
|
||||
return TiramisuContextProperty(self)
|
||||
if subfunc == 'option':
|
||||
return ContextOption(self,
|
||||
self.schema)
|
||||
if subfunc == 'value':
|
||||
return ContextValue(self,
|
||||
self.schema)
|
||||
if subfunc == 'owner':
|
||||
return ContextOwner(self,
|
||||
self.schema)
|
||||
raise APIError(_('please specify a valid sub function ({})').format(subfunc))
|
||||
|
||||
def add_value(self,
|
||||
path: str,
|
||||
index: Optional[int],
|
||||
value: Any,
|
||||
remote: bool) -> None:
|
||||
if remote:
|
||||
self.manage_updates('add',
|
||||
path,
|
||||
index,
|
||||
value)
|
||||
self.updates_value('add',
|
||||
path,
|
||||
index,
|
||||
value,
|
||||
remote)
|
||||
if not remote:
|
||||
self.manage_updates('add',
|
||||
path,
|
||||
index,
|
||||
value)
|
||||
|
||||
def modify_value(self,
|
||||
path: str,
|
||||
index: Optional[int],
|
||||
value: Any,
|
||||
remote: bool,
|
||||
leader_old_value: Any) -> None:
|
||||
schema = self.get_schema(path)
|
||||
has_undefined = value is not None and isinstance(value, list) and undefined in value
|
||||
new_value = schema.get('defaultmulti')
|
||||
if not remote:
|
||||
if has_undefined:
|
||||
while undefined in value:
|
||||
undefined_index = value.index(undefined)
|
||||
schema_value = schema.get('value', [])
|
||||
if len(schema_value) > undefined_index:
|
||||
value[undefined_index] = schema_value[undefined_index]
|
||||
else:
|
||||
value[undefined_index] = new_value
|
||||
self.updates_value('modify',
|
||||
path,
|
||||
index,
|
||||
value,
|
||||
remote,
|
||||
False,
|
||||
leader_old_value)
|
||||
self.manage_updates('modify',
|
||||
path,
|
||||
index,
|
||||
value)
|
||||
|
||||
elif has_undefined:
|
||||
for idx, val in enumerate(value):
|
||||
self.manage_updates('modify',
|
||||
path,
|
||||
idx,
|
||||
val)
|
||||
else:
|
||||
self.manage_updates('modify',
|
||||
path,
|
||||
index,
|
||||
value)
|
||||
if remote:
|
||||
self.updates_value('modify',
|
||||
path,
|
||||
index,
|
||||
value,
|
||||
remote,
|
||||
False,
|
||||
leader_old_value)
|
||||
|
||||
def delete_value(self,
|
||||
path: str,
|
||||
index: Optional[int]) -> None:
|
||||
if self.get_schema(path)['type'] == 'symlink':
|
||||
raise ConfigError(_("can't delete a SymLinkOption"))
|
||||
remote = self.form.get(path, {}).get('remote', False)
|
||||
if remote:
|
||||
self.manage_updates('delete',
|
||||
path,
|
||||
index,
|
||||
None)
|
||||
self.updates_value('delete',
|
||||
path,
|
||||
index,
|
||||
None,
|
||||
remote)
|
||||
if not remote:
|
||||
self.manage_updates('delete',
|
||||
path,
|
||||
index,
|
||||
None)
|
||||
|
||||
def get_properties(self,
|
||||
model,
|
||||
path,
|
||||
index,
|
||||
only_raises=True):
|
||||
props = model.get('properties', [])[:]
|
||||
if model.get('required'):
|
||||
if self.get_schema(path).get('isMulti', False) and not self.isfollower(path):
|
||||
props.append('empty')
|
||||
else:
|
||||
props.append('mandatory')
|
||||
if model.get('needs_len'):
|
||||
props.append('mandatory')
|
||||
if model.get('readOnly'):
|
||||
props.append('frozen')
|
||||
if only_raises and self.is_hidden(path,
|
||||
index):
|
||||
props.append('hidden')
|
||||
if self.form.get(path, {}).get('clearable'):
|
||||
props.append('clearable')
|
||||
return props
|
||||
|
||||
def get_schema(self,
|
||||
path):
|
||||
root_path = self.root
|
||||
schema = {'properties': self.schema,
|
||||
'type': 'object'}
|
||||
if root_path:
|
||||
root = self.root.split('.')
|
||||
if not path.startswith(self.root):
|
||||
raise Exception('cannot find {0}'.format(path))
|
||||
subpaths = path.split('.')[len(root):]
|
||||
else:
|
||||
subpaths = path.split('.')
|
||||
for subpath in subpaths:
|
||||
if root_path:
|
||||
root_path += '.' + subpath
|
||||
else:
|
||||
root_path = subpath
|
||||
schema = schema['properties'][root_path]
|
||||
return schema
|
||||
|
||||
def isleader(self,
|
||||
path):
|
||||
if '.' in path:
|
||||
parent_schema = self.get_schema(path.rsplit('.', 1)[0])
|
||||
if parent_schema['type'] == 'array':
|
||||
leader = next(iter(parent_schema['properties'].keys()))
|
||||
return leader == path
|
||||
return False
|
||||
|
||||
|
||||
def isfollower(self,
|
||||
path: str) -> bool:
|
||||
if '.' in path:
|
||||
parent_schema = self.get_schema(path.rsplit('.', 1)[0])
|
||||
leader = next(iter(parent_schema['properties'].keys()))
|
||||
if parent_schema['type'] == 'array' and \
|
||||
leader != path:
|
||||
return True
|
||||
return False
|
||||
|
||||
def is_hidden(self,
|
||||
path: str,
|
||||
index: Optional[int],
|
||||
permissive: bool=False) -> bool:
|
||||
if permissive:
|
||||
property_ = 'hidden'
|
||||
needs = True
|
||||
if property_ in self.global_model.get('permissives', []):
|
||||
return False
|
||||
else:
|
||||
property_ = 'display'
|
||||
needs = False
|
||||
if index is not None and property_ in self.temp.get(path, {}).get(str(index), {}):
|
||||
return self.temp[path][str(index)][property_] == needs
|
||||
if property_ in self.temp.get(path, {}):
|
||||
return self.temp[path][property_] == needs
|
||||
elif self.isfollower(path):
|
||||
if self.model.get(path, {}).get('null', {}).get(property_, None) == needs:
|
||||
return True
|
||||
elif self.model.get(path, {}).get(property_, None) == needs:
|
||||
return True
|
||||
if index is not None:
|
||||
index = str(index)
|
||||
if self.model.get(path, {}).get(index, {}).get(property_) == needs:
|
||||
return True
|
||||
return False
|
||||
|
||||
def get_from_temp_model(self,
|
||||
path,
|
||||
index):
|
||||
if path in self.temp:
|
||||
is_delete = 'delete' in self.temp[path] and self.temp[path]['delete'] == True
|
||||
if index is not None and not is_delete and 'delete' in self.temp[path].get(index, {}):
|
||||
is_delete = self.temp[path][index]['delete'] == True
|
||||
if is_delete:
|
||||
return None
|
||||
if index is None and 'value' in self.temp[path] or 'value' in self.temp[path].get(index, {}):
|
||||
return self.temp[path]
|
||||
return self.model.get(path)
|
||||
|
||||
def get_value(self,
|
||||
path: str,
|
||||
index: int=None) -> Any:
|
||||
schema = self.get_schema(path)
|
||||
if schema['type'] == 'symlink':
|
||||
path = schema['opt_path']
|
||||
schema = self.get_schema(path)
|
||||
if index is None:
|
||||
if self.isfollower(path):
|
||||
value = []
|
||||
parent_schema = self.get_schema(path.rsplit('.', 1)[0])
|
||||
leader = next(iter(parent_schema['properties'].keys()))
|
||||
leadership_len = len(self.get_value(leader))
|
||||
for idx in range(leadership_len):
|
||||
val = self.get_value(path,
|
||||
idx)
|
||||
self._check_raises_warnings(path, idx, val, schema['type'])
|
||||
value.append(val)
|
||||
else:
|
||||
value = self.get_from_temp_model(path, index)
|
||||
if value is not None and 'value' in value:
|
||||
value = value['value']
|
||||
else:
|
||||
value = schema.get('value')
|
||||
if value is None and schema.get('isMulti', False):
|
||||
value = []
|
||||
else:
|
||||
index = str(index)
|
||||
if self.isfollower(path):
|
||||
if self.is_hidden(path, index):
|
||||
value = PropertiesOptionError(None, {'disabled'}, None, opt_type='option')
|
||||
else:
|
||||
value = self.get_from_temp_model(path, index)
|
||||
if value is not None and 'value' in value.get(index, {}):
|
||||
value = value[index]['value']
|
||||
else:
|
||||
value = schema.get('defaultmulti')
|
||||
else:
|
||||
value = self.get_from_temp_model(path, index)
|
||||
if value is not None and index in value and 'value' in value[index]:
|
||||
value = value[index]['value']
|
||||
else:
|
||||
value = schema.get('default')
|
||||
if value is None and schema.get('isSubMulti', False):
|
||||
value = []
|
||||
if isinstance(value, list):
|
||||
value = value.copy()
|
||||
return value
|
||||
|
||||
def get_owner(self,
|
||||
path: str,
|
||||
index: int) -> str:
|
||||
schema = self.get_schema(path)
|
||||
if schema['type'] == 'symlink':
|
||||
opt_path = schema['opt_path']
|
||||
index = str(index)
|
||||
if self.is_hidden(opt_path, index):
|
||||
raise PropertiesOptionError(None, {'disabled'}, None, opt_type='option')
|
||||
return self.get_owner(opt_path, index)
|
||||
elif not self.isfollower(path):
|
||||
if 'owner' in self.temp.get(path, {}):
|
||||
owner = self.temp[path]['owner']
|
||||
else:
|
||||
owner = self.model.get(path, {}).get('owner', 'default')
|
||||
else:
|
||||
index = str(index)
|
||||
if self.is_hidden(path, index):
|
||||
raise PropertiesOptionError(None, {'disabled'}, None, opt_type='option')
|
||||
value = self.get_from_temp_model(path, index)
|
||||
if value is not None and 'owner' in value.get(index, {}):
|
||||
owner = value[index]['owner']
|
||||
else:
|
||||
owner = 'default'
|
||||
return owner
|
||||
|
||||
def manage_updates(self,
|
||||
action,
|
||||
path,
|
||||
index,
|
||||
value):
|
||||
update_last_action = False
|
||||
if self.updates:
|
||||
last_body = self.updates[-1]
|
||||
if last_body['name'] == path:
|
||||
if index is None and not 'index' in last_body:
|
||||
last_action = last_body['action']
|
||||
if last_action == action or \
|
||||
last_action in ['delete', 'modify'] and action in ['delete', 'modify']:
|
||||
update_last_action = True
|
||||
elif index == None and action == 'delete':
|
||||
for update in reversed(self.updates):
|
||||
if update['name'] == path:
|
||||
del self.updates[-1]
|
||||
else:
|
||||
break
|
||||
elif last_body.get('index') == index:
|
||||
if last_body['action'] == 'add' and action == 'modify':
|
||||
action = 'add'
|
||||
update_last_action = True
|
||||
elif last_body['action'] == action and action != 'delete' or \
|
||||
last_body['action'] == 'modify' and action == 'delete':
|
||||
update_last_action = True
|
||||
elif last_body['action'] == 'add' and action == 'delete':
|
||||
del self.updates[-1]
|
||||
|
||||
if update_last_action:
|
||||
if action == 'delete' and value is None:
|
||||
if 'value' in last_body:
|
||||
del last_body['value']
|
||||
else:
|
||||
last_body['value'] = value
|
||||
if index is None and 'index' in last_body:
|
||||
del last_body['index']
|
||||
last_body['action'] = action
|
||||
else:
|
||||
data = {'action': action,
|
||||
'name': path}
|
||||
if action != 'delete' and value is not undefined:
|
||||
data['value'] = value
|
||||
if index is not None:
|
||||
data['index'] = index
|
||||
self.updates.append(data)
|
||||
|
||||
def send(self):
|
||||
if DEBUG:
|
||||
print('<===== send')
|
||||
print(self.updates)
|
||||
self.send_data({'updates': self.updates,
|
||||
'model': self.model})
|
||||
|
||||
def updates_value(self,
|
||||
action: str,
|
||||
path: str,
|
||||
index: Optional[int],
|
||||
value: Optional[Any],
|
||||
remote: bool,
|
||||
default_value: bool=False,
|
||||
leader_old_value: Optional[Any]=undefined) -> None:
|
||||
# if 'pattern' in self.form.get(path, {}) and (not isinstance(value, list) or undefined not in value) and not self.test_value(path, index, value, remote):
|
||||
# return
|
||||
if remote:
|
||||
self.send()
|
||||
else:
|
||||
changes = []
|
||||
if self.test_value(path, index, value) and not self.is_hidden(path, index):
|
||||
changes.append(path)
|
||||
if path in self.model and (index is None or str(index) in self.model[path]):
|
||||
model = self.model[path]
|
||||
if index is not None:
|
||||
model = model[str(index)]
|
||||
if 'warnings' in model:
|
||||
del model['warnings']
|
||||
if 'error' in model:
|
||||
del model['error']
|
||||
if action == 'delete':
|
||||
if self.option(path).option.isleader():
|
||||
# if remove an indexed leader value
|
||||
if index is not None:
|
||||
leader_value = self.option(path).value.get()
|
||||
leader_value.pop(index)
|
||||
owner = self.global_model.get('owner', 'tmp')
|
||||
else:
|
||||
leader_value = self.default_value(path)
|
||||
owner = 'default'
|
||||
max_value = len(leader_value) + 1
|
||||
self._set_temp_value(path, None, leader_value, owner)
|
||||
leadership_path = path.rsplit('.', 1)[0]
|
||||
parent_schema = self.get_schema(leadership_path)
|
||||
iter_leadership = list(parent_schema['properties'].keys())
|
||||
for follower in iter_leadership[1:]:
|
||||
# remove value for this index and reduce len
|
||||
new_temp = {}
|
||||
for idx in range(max_value):
|
||||
cur_index = idx
|
||||
if index is not None:
|
||||
if index == idx:
|
||||
continue
|
||||
if index < cur_index:
|
||||
cur_index -= 1
|
||||
if 'delete' in self.temp.get(follower, {}):
|
||||
#FIXME copier les attributs hidden, ... depuis self.temp[follower][index] ?
|
||||
new_temp[str(cur_index)] = {'delete': True}
|
||||
elif self.temp.get(follower, {}).get(str(idx)) is not None:
|
||||
if index is None or index == idx:
|
||||
new_temp[str(cur_index)] = {'delete': True}
|
||||
else:
|
||||
new_temp[str(cur_index)] = self.temp[follower][str(idx)]
|
||||
elif self.model.get(follower, {}).get(str(idx)) is not None:
|
||||
if index is None or index == idx:
|
||||
new_temp[str(cur_index)] = {'delete': True}
|
||||
else:
|
||||
new_temp[str(cur_index)] = self.model[follower][str(idx)]
|
||||
if self.model.get(follower, {}).get(str(max_value)) is not None:
|
||||
# FIXME copier les attributs hidden, ... depuis self.temp[follower][index] ?
|
||||
new_temp[str(max_value)] = {'delete': True}
|
||||
self.temp[follower] = new_temp
|
||||
value = leader_value
|
||||
index = None
|
||||
elif index is None:
|
||||
self._del_temp_value(path, index)
|
||||
value = self.get_value(path)
|
||||
else:
|
||||
# it's a follower with index
|
||||
self.temp.setdefault(path, {})[str(index)] = {'delete': True}
|
||||
self._del_temp_value(path, index)
|
||||
value = self.get_value(path, index)
|
||||
default_value = True
|
||||
elif index is None:
|
||||
# set a value for a not follower option
|
||||
self.set_not_equal(path, value, index)
|
||||
if default_value is True:
|
||||
self.model[path]['value'] = value
|
||||
else:
|
||||
self._set_temp_value(path, None, value, self.global_model.get('owner', 'tmp'))
|
||||
else:
|
||||
self.set_not_equal(path, value, index)
|
||||
# set a value for a follower option
|
||||
if default_value is True:
|
||||
self.model[path][str(index)]['value'] = value
|
||||
else:
|
||||
self._set_temp_value(path, index, value, self.global_model.get('owner', 'tmp'))
|
||||
if not self.is_hidden(path, index):
|
||||
changes.append(path)
|
||||
self.set_dependencies(path, value, False, changes, index)
|
||||
self.do_copy(path, index, value, changes)
|
||||
if leader_old_value is not undefined and len(leader_old_value) < len(value):
|
||||
# if leader and length is change, display/hide follower from follower's default value
|
||||
index = len(value) - 1
|
||||
parent_path = '.'.join(path.split('.')[:-1])
|
||||
followers = list(self.option(parent_path).list())[1:]
|
||||
for follower in followers:
|
||||
follower_path = follower.option.path()
|
||||
try:
|
||||
follower_value = self.option(follower_path, index).value.get()
|
||||
self.set_dependencies(follower_path, follower_value, None, index)
|
||||
except PropertiesOptionError:
|
||||
pass
|
||||
for path in changes:
|
||||
self.send_event('tiramisu-change', path)
|
||||
|
||||
def _set_temp_value(self, path, index, value, owner):
|
||||
if index is not None:
|
||||
obj = self.temp.setdefault(path, {}).setdefault(str(index), {})
|
||||
else:
|
||||
obj = self.temp.setdefault(path, {})
|
||||
if 'delete' in obj:
|
||||
del obj['delete']
|
||||
obj['value'] = value
|
||||
obj['owner'] = owner
|
||||
|
||||
def _del_temp_value(self, path, index):
|
||||
if index is not None:
|
||||
obj = self.temp.setdefault(path, {}).setdefault(str(index), {})
|
||||
else:
|
||||
obj = self.temp.setdefault(path, {})
|
||||
for key in ['value', 'owner']:
|
||||
if key in obj:
|
||||
del obj[key]
|
||||
obj['delete'] = True
|
||||
|
||||
def default_value(self, path):
|
||||
schema = self.get_schema(path)
|
||||
value = schema.get('value')
|
||||
if value is None and schema.get('isMulti', False):
|
||||
value = []
|
||||
elif isinstance(value, list):
|
||||
value = value.copy()
|
||||
return value
|
||||
|
||||
def updates_data(self, data):
|
||||
if DEBUG:
|
||||
from pprint import pprint
|
||||
print('====> updates_data')
|
||||
pprint(data)
|
||||
self.updates = []
|
||||
self.temp.clear()
|
||||
self.model = data['model']
|
||||
for path in data['updates']:
|
||||
self.send_event('tiramisu-change', path)
|
||||
|
||||
def test_value(self,
|
||||
path: str,
|
||||
index: Optional[int],
|
||||
value: Any) -> bool:
|
||||
if isinstance(value, list):
|
||||
for val in value:
|
||||
if not self._test_value(path, index, val):
|
||||
# when a value is invalid, all are invalid
|
||||
return False
|
||||
return True
|
||||
return not self._test_value(path, index, value)
|
||||
|
||||
def _test_value(self,
|
||||
path: str,
|
||||
index:Optional[int],
|
||||
value: Any) -> bool:
|
||||
if not path in self.form or not 'pattern' in self.form[path]:
|
||||
return True
|
||||
if value is None or value is '':
|
||||
match = True
|
||||
else:
|
||||
if isinstance(value, int):
|
||||
value = str(value)
|
||||
match = self.form[path]['pattern'].search(value) is not None
|
||||
if not path in self.temp:
|
||||
self.temp[path] = {}
|
||||
if index is None:
|
||||
if not match:
|
||||
self.temp[path]['invalid'] = True
|
||||
self.temp[path]['error'] = ['invalid value']
|
||||
else:
|
||||
self.temp[path]['invalid'] = False
|
||||
else:
|
||||
if not str(index) in self.temp[path]:
|
||||
self.temp[path][str(index)] = {}
|
||||
if not match:
|
||||
self.temp[path][str(index)]['invalid'] = True
|
||||
self.temp[path][str(index)]['error'] = ['invalid value']
|
||||
else:
|
||||
self.temp[path][str(index)]['invalid'] = False
|
||||
return match
|
||||
|
||||
def set_dependencies(self,
|
||||
path: str,
|
||||
ori_value: Any,
|
||||
force_hide: bool,
|
||||
changes: List,
|
||||
index: Optional[int]=None) -> None:
|
||||
dependencies = self.form.get(path, {}).get('dependencies', {})
|
||||
if dependencies:
|
||||
if not isinstance(ori_value, list):
|
||||
self._set_dependencies(path, ori_value, dependencies, force_hide, changes, index)
|
||||
else:
|
||||
for idx, ori_val in enumerate(ori_value):
|
||||
self._set_dependencies(path, ori_val, dependencies, force_hide, changes, idx)
|
||||
|
||||
def _set_dependencies(self,
|
||||
path: str,
|
||||
ori_value: Any,
|
||||
dependencies: Dict,
|
||||
force_hide: bool,
|
||||
changes: List,
|
||||
index: Optional[int]) -> None:
|
||||
if ori_value in dependencies['expected']:
|
||||
expected = dependencies['expected'][ori_value]
|
||||
else:
|
||||
expected = dependencies['default']
|
||||
for action in ['hide', 'show']:
|
||||
expected_actions = expected.get(action)
|
||||
if expected_actions:
|
||||
if force_hide:
|
||||
display = True
|
||||
else:
|
||||
display = action == 'show'
|
||||
for expected_path in expected_actions:
|
||||
if expected_path not in self.temp:
|
||||
self.temp[expected_path] = {}
|
||||
old_hidden = self.is_hidden(expected_path,
|
||||
index)
|
||||
leader_path = None
|
||||
if index is not None:
|
||||
if str(index) not in self.temp[expected_path]:
|
||||
self.temp[expected_path][str(index)] = {}
|
||||
self.temp[expected_path][str(index)]['display'] = display
|
||||
else:
|
||||
self.temp[expected_path]['display'] = display
|
||||
schema = self.get_schema(expected_path)
|
||||
if schema['type'] == 'array':
|
||||
leader_path = next(iter(schema['properties'].keys()))
|
||||
if leader_path not in self.temp:
|
||||
self.temp[leader_path] = {}
|
||||
self.temp[leader_path]['display'] = display
|
||||
if old_hidden == display:
|
||||
if expected_path not in changes:
|
||||
changes.append(expected_path)
|
||||
if leader_path not in changes:
|
||||
changes.append(leader_path)
|
||||
value = self.get_value(expected_path, index)
|
||||
self.set_dependencies(expected_path, value, not display, changes, index)
|
||||
|
||||
def set_not_equal(self,
|
||||
path: str,
|
||||
value: Any,
|
||||
index: Optional[int]) -> None:
|
||||
not_equals = self.form.get(path, {}).get('not_equal', [])
|
||||
if not_equals:
|
||||
vals = []
|
||||
opts = []
|
||||
if isinstance(value, list):
|
||||
for val in value:
|
||||
vals.append(val)
|
||||
opts.append(path)
|
||||
else:
|
||||
vals.append(value)
|
||||
opts.append(path)
|
||||
for not_equal in not_equals:
|
||||
for path_ in not_equal['options']:
|
||||
if self.is_hidden(path_, index, permissive=True):
|
||||
raise PropertiesOptionError(None, {'hidden'}, None, opt_type='option')
|
||||
schema = self.get_schema(path_)
|
||||
p_value = self.get_value(path_)
|
||||
if isinstance(p_value, list):
|
||||
for val in p_value:
|
||||
vals.append(val)
|
||||
opts.append(path_)
|
||||
else:
|
||||
vals.append(p_value)
|
||||
opts.append(path_)
|
||||
equal = []
|
||||
warnings_only = not_equal.get('warnings', False)
|
||||
if warnings_only and 'warnings' not in self.global_model.get('properties', []):
|
||||
continue
|
||||
if warnings_only:
|
||||
msg = _('should be different from the value of {}')
|
||||
#msgcurr = _('value for {} should be different')
|
||||
else:
|
||||
msg = _('must be different from the value of {}')
|
||||
#msgcurr = _('value for {} must be different')
|
||||
for idx_inf, val_inf in enumerate(vals):
|
||||
for idx_sup, val_sup in enumerate(vals[idx_inf + 1:]):
|
||||
if val_inf == val_sup is not None:
|
||||
for opt_ in [opts[idx_inf], opts[idx_inf + idx_sup + 1]]:
|
||||
if opt_ not in equal:
|
||||
equal.append(opt_)
|
||||
if equal:
|
||||
equal_name = {}
|
||||
display_equal = []
|
||||
for opt_ in equal:
|
||||
display_equal.append('"' + self.get_schema(opt_)['title'] + '"')
|
||||
display_equal = display_list(display_equal)
|
||||
msg_ = msg.format(display_equal)
|
||||
is_demoting_error_warning = 'demoting_error_warning' in self.global_model.get('properties', [])
|
||||
if warnings_only or is_demoting_error_warning:
|
||||
paths = not_equal['options'] + [path]
|
||||
else:
|
||||
paths = [path]
|
||||
for path_ in paths:
|
||||
if path_ not in self.temp:
|
||||
self.temp[path_] = {}
|
||||
temp = self.temp[path_]
|
||||
if index is not None:
|
||||
if str(index) not in temp:
|
||||
temp[str(index)] = {}
|
||||
temp = temp[str(index)]
|
||||
if warnings_only:
|
||||
temp['hasWarnings'] = True
|
||||
temp.setdefault('warnings', []).append(msg_)
|
||||
else:
|
||||
if not is_demoting_error_warning:
|
||||
raise ValueError(msg)
|
||||
temp['invalid'] = True
|
||||
temp.setdefault('error', []).append(msg_)
|
||||
|
||||
def do_copy(self,
|
||||
path: str,
|
||||
index: Optional[int],
|
||||
value: Any,
|
||||
changes: List) -> None:
|
||||
copy = self.form.get(path, {}).get('copy')
|
||||
if copy:
|
||||
for opt in copy:
|
||||
owner = self.get_owner(opt, index)
|
||||
if owner == 'default':
|
||||
# do not change in self.temp, it's default value
|
||||
if self.model[opt]['value'] != value:
|
||||
if isinstance(value, list):
|
||||
value = value.copy()
|
||||
# self.model[opt]['value'] = value
|
||||
remote = self.form.get(opt, {}).get('remote', False)
|
||||
self.updates_value('modify', opt, index, value, remote, True)
|
||||
if not self.is_hidden(opt, index) and opt not in changes:
|
||||
changes.append(opt)
|
||||
|
||||
|
||||
def _check_raises_warnings(self, path, index, value, type, withwarning=True):
|
||||
subconfig_value = self.option(path, index).value
|
||||
if not subconfig_value.valid():
|
||||
is_demoting_error_warning = 'demoting_error_warning' in self.global_model.get('properties', [])
|
||||
for err in subconfig_value.error_message():
|
||||
if is_demoting_error_warning:
|
||||
warnings.warn_explicit(ValueErrorWarning(value,
|
||||
type,
|
||||
Option(path, path),
|
||||
'{0}'.format(err),
|
||||
index),
|
||||
ValueErrorWarning,
|
||||
'Option', 0)
|
||||
else:
|
||||
if path not in self.temp:
|
||||
self.temp[path] = {}
|
||||
if index is None:
|
||||
obj = self.temp[path]
|
||||
else:
|
||||
if str(index) not in self.temp[path]:
|
||||
self.temp[path][str(index)] = {}
|
||||
obj = self.temp[path][str(index)]
|
||||
obj['invalid'] = False
|
||||
raise ValueError(err)
|
||||
|
||||
if withwarning and subconfig_value.warning():
|
||||
for warn in subconfig_value.warning_message():
|
||||
warnings.warn_explicit(ValueErrorWarning(value,
|
||||
type,
|
||||
Option(path, path),
|
||||
'{0}'.format(warn),
|
||||
index),
|
||||
ValueErrorWarning,
|
||||
'Option', 0)
|
||||
|
||||
def send_data(self,
|
||||
updates):
|
||||
raise NotImplementedError('please implement send_data method')
|
||||
|
||||
def send_event(self,
|
||||
evt: str,
|
||||
path: str):
|
||||
pass
|
165
tiramisu_api/error.py
Normal file
165
tiramisu_api/error.py
Normal file
@ -0,0 +1,165 @@
|
||||
try:
|
||||
from tiramisu.error import APIError, ValueWarning, ValueOptionError, ValueErrorWarning, PropertiesOptionError, ConfigError, display_list
|
||||
except ModuleNotFoundError:
|
||||
import weakref
|
||||
from .i18n import _
|
||||
|
||||
def display_list(lst, separator='and', add_quote=False):
|
||||
if separator == 'and':
|
||||
separator = _('and')
|
||||
elif separator == 'or':
|
||||
separator = _('or')
|
||||
if isinstance(lst, tuple) or isinstance(lst, frozenset):
|
||||
lst = list(lst)
|
||||
if len(lst) == 1:
|
||||
ret = lst[0]
|
||||
if not isinstance(ret, str):
|
||||
ret = str(ret)
|
||||
if add_quote:
|
||||
ret = '"{}"'.format(ret)
|
||||
return ret
|
||||
else:
|
||||
lst.sort()
|
||||
lst_ = []
|
||||
for l in lst[:-1]:
|
||||
if not isinstance(l, str):
|
||||
l = str(l)
|
||||
if add_quote:
|
||||
l = '"{}"'.format(l)
|
||||
lst_.append(_(l))
|
||||
last = lst[-1]
|
||||
if not isinstance(last, str):
|
||||
last = str(_(last))
|
||||
if add_quote:
|
||||
last = '"{}"'.format(last)
|
||||
return ', '.join(lst_) + _(' {} ').format(separator) + '{}'.format(last)
|
||||
|
||||
#____________________________________________________________
|
||||
# Exceptions for a Config
|
||||
class ConfigError(Exception):
|
||||
"""attempt to change an option's owner without a value
|
||||
or in case of `_cfgimpl_descr` is None
|
||||
or if a calculation cannot be carried out"""
|
||||
def __init__(self,
|
||||
exp,
|
||||
ori_err=None):
|
||||
super().__init__(exp)
|
||||
self.ori_err = ori_err
|
||||
|
||||
class APIError(Exception):
|
||||
pass
|
||||
|
||||
class _CommonError:
|
||||
def __init__(self,
|
||||
val,
|
||||
display_type,
|
||||
opt,
|
||||
err_msg,
|
||||
index):
|
||||
self.val = val
|
||||
self.display_type = display_type
|
||||
self.opt = weakref.ref(opt)
|
||||
self.err_msg = err_msg
|
||||
self.index = index
|
||||
super().__init__(self.err_msg)
|
||||
|
||||
def __str__(self):
|
||||
try:
|
||||
msg = self.prefix
|
||||
except AttributeError:
|
||||
if self.opt() is None:
|
||||
self.prefix = ''
|
||||
else:
|
||||
self.prefix = self.tmpl.format(self.val,
|
||||
self.display_type,
|
||||
self.opt().impl_get_display_name())
|
||||
msg = self.prefix
|
||||
if self.err_msg:
|
||||
if msg:
|
||||
msg += ', {}'.format(self.err_msg)
|
||||
else:
|
||||
msg = self.err_msg
|
||||
if not msg:
|
||||
msg = _('invalid value')
|
||||
return msg
|
||||
|
||||
class ValueWarning(_CommonError, UserWarning):
|
||||
tmpl = _('attention, "{0}" could be an invalid {1} for "{2}"')
|
||||
|
||||
class ValueOptionError(_CommonError, ValueError):
|
||||
tmpl = _('"{0}" is an invalid {1} for "{2}"')
|
||||
|
||||
class ValueErrorWarning(ValueWarning):
|
||||
tmpl = _('"{0}" is an invalid {1} for "{2}"')
|
||||
|
||||
# Exceptions for an Option
|
||||
class PropertiesOptionError(AttributeError):
|
||||
"attempt to access to an option with a property that is not allowed"
|
||||
def __init__(self,
|
||||
option_bag,
|
||||
proptype,
|
||||
settings,
|
||||
opt_type=None,
|
||||
requires=None,
|
||||
name=None,
|
||||
orig_opt=None):
|
||||
if opt_type:
|
||||
self._opt_type = opt_type
|
||||
self._requires = requires
|
||||
self._name = name
|
||||
self._orig_opt = orig_opt
|
||||
else:
|
||||
if option_bag.option.impl_is_optiondescription():
|
||||
self._opt_type = 'optiondescription'
|
||||
else:
|
||||
self._opt_type = 'option'
|
||||
self._requires = option_bag.option.impl_getrequires()
|
||||
self._name = option_bag.option.impl_get_display_name()
|
||||
self._orig_opt = None
|
||||
self._option_bag = option_bag
|
||||
self.proptype = proptype
|
||||
self._settings = settings
|
||||
self.msg = None
|
||||
super(PropertiesOptionError, self).__init__(None)
|
||||
|
||||
def set_orig_opt(self, opt):
|
||||
self._orig_opt = opt
|
||||
|
||||
def __str__(self):
|
||||
# this part is a bit slow, so only execute when display
|
||||
if self.msg:
|
||||
return self.msg
|
||||
if self._option_bag is None:
|
||||
return "unknown error"
|
||||
req = self._settings.apply_requires(self._option_bag,
|
||||
True)
|
||||
# if req != {} or self._orig_opt is not None:
|
||||
if req != {}:
|
||||
only_one = len(req) == 1
|
||||
msg = []
|
||||
for action, msg_ in req.items():
|
||||
msg.append('"{0}" ({1})'.format(action, display_list(msg_)))
|
||||
msg = display_list(msg)
|
||||
else:
|
||||
only_one = len(self.proptype) == 1
|
||||
msg = display_list(list(self.proptype), add_quote=True)
|
||||
if only_one:
|
||||
prop_msg = _('property')
|
||||
else:
|
||||
prop_msg = _('properties')
|
||||
if self._orig_opt:
|
||||
self.msg = str(_('cannot access to {0} "{1}" because "{2}" has {3} {4}'
|
||||
'').format(self._opt_type,
|
||||
self._orig_opt.impl_get_display_name(),
|
||||
self._name,
|
||||
prop_msg,
|
||||
msg))
|
||||
else:
|
||||
self.msg = str(_('cannot access to {0} "{1}" because has {2} {3}'
|
||||
'').format(self._opt_type,
|
||||
self._name,
|
||||
prop_msg,
|
||||
msg))
|
||||
del self._requires, self._opt_type, self._name, self._option_bag
|
||||
del self._settings, self._orig_opt
|
||||
return self.msg
|
6
tiramisu_api/i18n.py
Normal file
6
tiramisu_api/i18n.py
Normal file
@ -0,0 +1,6 @@
|
||||
try:
|
||||
from tiramisu.i18n import _
|
||||
except ModuleNotFoundError:
|
||||
# FIXME
|
||||
def _(val):
|
||||
return val
|
11
tiramisu_api/setting.py
Normal file
11
tiramisu_api/setting.py
Normal file
@ -0,0 +1,11 @@
|
||||
try:
|
||||
from tiramisu.setting import undefined
|
||||
except ModuleNotFoundError:
|
||||
class Undefined(object):
|
||||
def __str__(self):
|
||||
return 'Undefined'
|
||||
|
||||
__repr__ = __str__
|
||||
|
||||
|
||||
undefined = Undefined()
|
Reference in New Issue
Block a user