634 lines
23 KiB
Python
634 lines
23 KiB
Python
from os import listdir
|
|
from os.path import join, basename, dirname, isfile
|
|
from glob import glob
|
|
from gettext import translation
|
|
try:
|
|
from tiramisu3 import StrOption, IntOption, BoolOption, ChoiceOption, OptionDescription, \
|
|
SymLinkOption, FloatOption, Calculation, Params, ParamOption, \
|
|
ParamValue, calc_value, calc_value_property_help, groups, Option
|
|
except:
|
|
from tiramisu import StrOption, IntOption, BoolOption, ChoiceOption, OptionDescription, \
|
|
SymLinkOption, FloatOption, Calculation, Params, ParamOption, \
|
|
ParamValue, calc_value, calc_value_property_help, groups, Option
|
|
from yaml import load, SafeLoader
|
|
|
|
|
|
from .config import get_config
|
|
from .utils import _
|
|
|
|
|
|
MESSAGE_ROOT_PATH = get_config()['global']['message_root_path']
|
|
groups.addgroup('message')
|
|
MESSAGE_TRANSLATION = translation('risotto-message', join(MESSAGE_ROOT_PATH, '..', 'locale')).gettext
|
|
|
|
|
|
|
|
class DictOption(Option):
|
|
__slots__ = tuple()
|
|
_type = 'dict'
|
|
_display_name = _('dict')
|
|
|
|
def validate(self, value):
|
|
if not isinstance(value, dict):
|
|
raise ValueError()
|
|
|
|
|
|
class AnyOption(Option):
|
|
__slots__ = tuple()
|
|
_type = 'any value'
|
|
_display_name = _('any')
|
|
|
|
def validate(self, value):
|
|
pass
|
|
|
|
|
|
class MessageDefinition:
|
|
"""
|
|
A MessageDefinition is a representation of a message in the Zephir application messaging context
|
|
"""
|
|
__slots__ = ('version',
|
|
'message',
|
|
'description',
|
|
'parameters',
|
|
'default_roles',
|
|
'errors',
|
|
'pattern',
|
|
'related',
|
|
'response',
|
|
'options',
|
|
)
|
|
|
|
def __init__(self,
|
|
raw_def,
|
|
version,
|
|
message):
|
|
# default value for non mandatory key
|
|
self.version = version
|
|
self.parameters = {}
|
|
self.errors = []
|
|
self.related = []
|
|
self.default_roles = []
|
|
self.response = None
|
|
self.message = message
|
|
self.options = None
|
|
|
|
# loads yaml information into object
|
|
for key, value in raw_def.items():
|
|
if key == 'message':
|
|
raise Exception('message in not allowed in message')
|
|
if isinstance(value, str):
|
|
value = value.strip()
|
|
if key == 'pattern':
|
|
if value not in ['rpc', 'event']:
|
|
raise Exception(_('unknown pattern {}').format(value))
|
|
elif key == 'parameters':
|
|
if 'type' in value and isinstance(value['type'], str):
|
|
# should be a customtype
|
|
value = CUSTOMTYPES[self.version][value['type']].properties
|
|
else:
|
|
value = _parse_parameters(value,
|
|
self.version)
|
|
elif key == 'response':
|
|
value = ResponseDefinition(value,
|
|
self.version)
|
|
elif key == 'description':
|
|
value = value.strip().rstrip()
|
|
if value.endswith('.'):
|
|
value = value[:-1]
|
|
value = MESSAGE_TRANSLATION(value)
|
|
setattr(self, key, value)
|
|
# check mandatory keys
|
|
for key in self.__slots__:
|
|
try:
|
|
getattr(self, key)
|
|
except AttributeError:
|
|
raise Exception(_('mandatory key not set {} message').format(key))
|
|
|
|
|
|
class ParameterDefinition:
|
|
__slots__ = ('name',
|
|
'type',
|
|
'description',
|
|
'help',
|
|
'default',
|
|
'ref',
|
|
'shortarg')
|
|
|
|
def __init__(self,
|
|
name,
|
|
version,
|
|
raw_def):
|
|
self.name = name
|
|
# default value for non mandatory key
|
|
self.help = None
|
|
self.ref = None
|
|
self.shortarg = None
|
|
|
|
# loads yaml information into object
|
|
for key, value in raw_def.items():
|
|
if isinstance(value, str):
|
|
value = value.strip()
|
|
if key == 'type':
|
|
if value.startswith('[]'):
|
|
tvalue = value[2:]
|
|
else:
|
|
tvalue = value
|
|
if tvalue in CUSTOMTYPES[version]:
|
|
if value.startswith('[]'):
|
|
value = '[]{}'.format(CUSTOMTYPES[version][tvalue].type)
|
|
else:
|
|
value = CUSTOMTYPES[version][value].type
|
|
else:
|
|
self._valid_type(value)
|
|
#self._valid_type(value)
|
|
elif key == 'description':
|
|
if value.endswith('.'):
|
|
value = value[:-1]
|
|
value = MESSAGE_TRANSLATION(value)
|
|
setattr(self, key, value)
|
|
# check mandatory keys
|
|
for key in self.__slots__:
|
|
try:
|
|
getattr(self, key)
|
|
except AttributeError:
|
|
if key != 'default':
|
|
raise Exception(_('mandatory key not set "{}" in parameter').format(key))
|
|
|
|
def _valid_type(self, typ):
|
|
if typ.startswith('[]'):
|
|
self._valid_type(typ[2:])
|
|
elif typ not in ['Boolean', 'String', 'Number', 'File', 'Dict', 'Any', 'Float']:
|
|
raise Exception(_('unknown parameter type: {}').format(typ))
|
|
|
|
|
|
class ResponseDefinition:
|
|
"""
|
|
An ResponseDefinition is a representation of a response in the Zephir application messaging context
|
|
"""
|
|
__slots__ = ('description',
|
|
'type',
|
|
'ref',
|
|
'parameters',
|
|
'required',
|
|
'multi')
|
|
|
|
def __init__(self,
|
|
responses,
|
|
version):
|
|
self.ref = None
|
|
self.parameters = None
|
|
self.multi = False
|
|
self.required = []
|
|
for key, value in responses.items():
|
|
if key in ['parameters', 'required']:
|
|
raise Exception(_('parameters and required must be set with a custom type'))
|
|
elif key == 'type':
|
|
if value.startswith('[]'):
|
|
tvalue = value[2:]
|
|
self.multi = True
|
|
else:
|
|
tvalue = value
|
|
if tvalue in CUSTOMTYPES[version]:
|
|
self.parameters = CUSTOMTYPES[version][tvalue].properties
|
|
self.required = CUSTOMTYPES[version][tvalue].required
|
|
if value.startswith('[]'):
|
|
value = '[]{}'.format(CUSTOMTYPES[version][tvalue].type)
|
|
else:
|
|
value = CUSTOMTYPES[version][tvalue].type
|
|
self.description = CUSTOMTYPES[version][tvalue].description
|
|
else:
|
|
raise Exception('only customtype is supported in response')
|
|
elif key == 'description':
|
|
raise Exception('description is not allowed in response')
|
|
setattr(self, key, value)
|
|
# check mandatory keys
|
|
for key in self.__slots__:
|
|
try:
|
|
getattr(self, key)
|
|
except AttributeError:
|
|
raise Exception(_('mandatory key not set {}').format(key))
|
|
|
|
|
|
def _parse_parameters(raw_defs,
|
|
version):
|
|
parameters = {}
|
|
for name, raw_def in raw_defs.items():
|
|
parameters[name] = ParameterDefinition(name,
|
|
version,
|
|
raw_def)
|
|
return parameters
|
|
|
|
|
|
def get_message(uri: str,
|
|
current_module_names: str,
|
|
) -> MessageDefinition:
|
|
try:
|
|
version, message = uri.split('.', 1)
|
|
path = get_message_file_path(version,
|
|
message,
|
|
current_module_names)
|
|
with open(path, "r") as message_file:
|
|
return MessageDefinition(load(message_file.read(), Loader=SafeLoader),
|
|
version,
|
|
message)
|
|
except Exception as err:
|
|
import traceback
|
|
traceback.print_exc()
|
|
raise Exception(_(f'cannot parse message {uri}: {err}'))
|
|
|
|
|
|
def get_message_file_path(version,
|
|
message,
|
|
current_module_names):
|
|
module_name, filename = message.split('.', 1)
|
|
if current_module_names and module_name not in current_module_names:
|
|
raise Exception(f'should only load message for {current_module_names}, not {message}')
|
|
return join(MESSAGE_ROOT_PATH, version, module_name, 'messages', filename + '.yml')
|
|
|
|
|
|
def list_messages(uris,
|
|
current_module_names,
|
|
current_version,
|
|
):
|
|
def get_module_paths(current_module_names):
|
|
if current_module_names is None:
|
|
current_module_names = listdir(join(MESSAGE_ROOT_PATH, version))
|
|
for module_name in current_module_names:
|
|
yield module_name, join(MESSAGE_ROOT_PATH, version, module_name, 'messages')
|
|
|
|
if current_version:
|
|
versions = [current_version]
|
|
else:
|
|
versions = listdir(join(MESSAGE_ROOT_PATH))
|
|
versions.sort()
|
|
for version in versions:
|
|
for module_name, message_path in get_module_paths(current_module_names):
|
|
for message in listdir(message_path):
|
|
if message.endswith('.yml'):
|
|
uri = version + '.' + module_name + '.' + message.rsplit('.', 1)[0]
|
|
# if uris is not None, return only is in uris' list
|
|
if uris is not None and uri not in uris:
|
|
continue
|
|
yield uri
|
|
|
|
|
|
class CustomParam:
|
|
__slots__ = ('name',
|
|
'type',
|
|
'description',
|
|
'ref',
|
|
'default')
|
|
|
|
def __init__(self, name, raw_def, required):
|
|
self.name = name
|
|
self.ref = None
|
|
if name not in required:
|
|
self.default = None
|
|
|
|
# loads yaml information into object
|
|
for key, value in raw_def.items():
|
|
if isinstance(value, str):
|
|
value = value.strip()
|
|
if key == 'type':
|
|
value = self._convert_type(value, raw_def)
|
|
elif key == 'items':
|
|
continue
|
|
elif key == 'description':
|
|
if value.endswith('.'):
|
|
value = value[:-1]
|
|
value = MESSAGE_TRANSLATION(value)
|
|
setattr(self, key, value)
|
|
|
|
# check mandatory keys
|
|
for key in self.__slots__:
|
|
try:
|
|
getattr(self, key)
|
|
except AttributeError:
|
|
# default value for non mandatory key
|
|
if key != 'default':
|
|
raise Exception(_('mandatory key not set "{}" in parameter').format(key))
|
|
|
|
def _convert_type(self, typ, raw_def):
|
|
types = {'boolean': 'Boolean',
|
|
'string': 'String',
|
|
'number': 'Number',
|
|
'object': 'Dict',
|
|
'array': 'Array',
|
|
'file': 'File',
|
|
'float': 'Float'}
|
|
|
|
if typ not in list(types.keys()):
|
|
# validate after
|
|
return typ
|
|
if typ == 'array' and 'items' in raw_def:
|
|
if not isinstance(raw_def['items'], dict):
|
|
raise Exception(_('items must be a dict'))
|
|
if list(raw_def['items'].keys()) != ['type']:
|
|
raise Exception(_('items must be a dict with only a type'))
|
|
return '[]{}'.format(self._convert_type(raw_def['items']['type'], raw_def))
|
|
return types[typ]
|
|
|
|
|
|
def _parse_custom_params(raw_defs, required):
|
|
parameters = {}
|
|
for name, raw_def in raw_defs.items():
|
|
parameters[name] = CustomParam(name, raw_def, required)
|
|
return parameters
|
|
|
|
|
|
class CustomType:
|
|
__slots__ = ('title',
|
|
'type',
|
|
'description',
|
|
'ref',
|
|
'properties',
|
|
'required')
|
|
|
|
def __init__(self, raw_def):
|
|
# default value for non mandatory key
|
|
self.ref = None
|
|
|
|
# loads yaml information into object
|
|
for key, value in raw_def.items():
|
|
if isinstance(value, str):
|
|
value = value.strip()
|
|
if key == 'type':
|
|
value = self._convert_type(value, raw_def)
|
|
elif key == 'properties':
|
|
value = _parse_custom_params(value, raw_def.get('required', {}))
|
|
elif key == 'description':
|
|
if value.endswith('.'):
|
|
value = value[:-1]
|
|
value = MESSAGE_TRANSLATION(value)
|
|
|
|
setattr(self, key, value)
|
|
# check mandatory keys
|
|
for key in self.__slots__:
|
|
try:
|
|
getattr(self, key)
|
|
except AttributeError:
|
|
raise Exception(_('mandatory key not set "{}" in parameter').format(key))
|
|
|
|
def _convert_type(self, typ, raw_def):
|
|
types = {'boolean': 'Boolean',
|
|
'string': 'String',
|
|
'number': 'Number',
|
|
'object': 'Dict',
|
|
'array': 'Array'}
|
|
|
|
if typ not in list(types.keys()):
|
|
# validate after
|
|
return typ
|
|
if typ == 'array' and 'items' in raw_def:
|
|
if not isinstance(raw_def['items'], dict):
|
|
raise Exception(_('items must be a dict'))
|
|
if list(raw_def['items'].keys()) != ['type']:
|
|
raise Exception(_('items must be a dict with only a type'))
|
|
return '[]{}'.format(self._convert_type(raw_def['items']['type'], raw_def))
|
|
return types[typ]
|
|
|
|
def getname(self):
|
|
return self.title
|
|
|
|
|
|
def load_customtypes() -> None:
|
|
versions = listdir(MESSAGE_ROOT_PATH)
|
|
versions.sort()
|
|
ret = {}
|
|
for version in versions:
|
|
if version not in ret:
|
|
ret[version] = {}
|
|
for current_module_name in listdir(join(MESSAGE_ROOT_PATH, version)):
|
|
types_path = join(MESSAGE_ROOT_PATH,
|
|
version,
|
|
current_module_name,
|
|
'types')
|
|
for message in listdir(types_path):
|
|
if message.endswith('.yml'):
|
|
path = join(types_path, message)
|
|
# remove extension
|
|
message = message.rsplit('.', 1)[0]
|
|
with open(path, "r") as message_file:
|
|
try:
|
|
custom_type = CustomType(load(message_file, Loader=SafeLoader))
|
|
ret[version][custom_type.getname()] = custom_type
|
|
except Exception as err:
|
|
raise Exception(_(f'enable to load type "{message}": {err}'))
|
|
return ret
|
|
|
|
|
|
def _get_description(description,
|
|
name):
|
|
# hack because some description are, in fact some help
|
|
if not '\n' in description and len(description) <= 150:
|
|
doc = description
|
|
else:
|
|
doc = name
|
|
if doc.endswith('.'):
|
|
doc= description[:-1]
|
|
doc = MESSAGE_TRANSLATION(doc)
|
|
return doc
|
|
|
|
|
|
def _get_option(name,
|
|
arg,
|
|
uri,
|
|
select_option,
|
|
):
|
|
"""generate option
|
|
"""
|
|
props = []
|
|
if not hasattr(arg, 'default'):
|
|
props.append('mandatory')
|
|
if select_option:
|
|
props.append(Calculation(calc_value,
|
|
Params(ParamValue('disabled'),
|
|
kwargs={'condition': ParamOption(select_option, todict=True),
|
|
'expected': ParamValue(uri),
|
|
'reverse_condition': ParamValue(True)}),
|
|
calc_value_property_help))
|
|
|
|
description = arg.description.strip().rstrip()
|
|
kwargs = {'name': name,
|
|
'doc': _get_description(description, name),
|
|
'properties': frozenset(props),
|
|
#'multi': arg.multi,
|
|
}
|
|
if hasattr(arg, 'default'):
|
|
kwargs['default'] = arg.default
|
|
type_ = arg.type
|
|
if type_.startswith('[]'):
|
|
kwargs['multi'] = True
|
|
type_ = type_[2:]
|
|
if type_ == 'Dict':
|
|
obj = DictOption(**kwargs)
|
|
elif type_ == 'String':
|
|
obj = StrOption(**kwargs)
|
|
elif type_ == 'Any':
|
|
obj = AnyOption(**kwargs)
|
|
elif 'Number' in type_ or type_ == 'ID' or type_ == 'Integer':
|
|
obj = IntOption(**kwargs)
|
|
elif type_ == 'Boolean':
|
|
obj = BoolOption(**kwargs)
|
|
elif type_ == 'Float':
|
|
obj = FloatOption(**kwargs)
|
|
else:
|
|
raise Exception('unsupported type {} in {}'.format(type_, uri))
|
|
obj.impl_set_information('ref', arg.ref)
|
|
return obj
|
|
|
|
|
|
def get_options(message_def,
|
|
uri,
|
|
select_option,
|
|
load_shortarg,
|
|
):
|
|
"""build option with args/kwargs
|
|
"""
|
|
options =[]
|
|
for name, arg in message_def.parameters.items():
|
|
current_opt = _get_option(name,
|
|
arg,
|
|
uri,
|
|
select_option,
|
|
)
|
|
options.append(current_opt)
|
|
if hasattr(arg, 'shortarg') and arg.shortarg and load_shortarg:
|
|
options.append(SymLinkOption(arg.shortarg, current_opt))
|
|
return options
|
|
|
|
|
|
def _parse_responses(message_def,
|
|
uri,
|
|
):
|
|
"""build option with returns
|
|
"""
|
|
if message_def.response.parameters is None:
|
|
raise Exception(f'message "{message_def.message}" did not returned any valid parameters')
|
|
|
|
options = []
|
|
names = []
|
|
for name, obj in message_def.response.parameters.items():
|
|
if name in names:
|
|
raise Exception(f'multi response with name "{name}" in "{uri}"')
|
|
names.append(name)
|
|
|
|
kwargs = {'name': name,
|
|
'doc': obj.description.strip().rstrip()}
|
|
type_ = obj.type
|
|
if type_.startswith('[]'):
|
|
kwargs['multi'] = True
|
|
type_ = type_[2:]
|
|
option = {'String': StrOption,
|
|
'Number': IntOption,
|
|
'Boolean': BoolOption,
|
|
'Dict': DictOption,
|
|
'Float': FloatOption,
|
|
# FIXME
|
|
'File': StrOption}.get(type_)
|
|
if not option:
|
|
raise Exception(f'unknown param type {obj.type} in responses of message {message_def.message}')
|
|
if hasattr(obj, 'default'):
|
|
kwargs['default'] = obj.default
|
|
else:
|
|
kwargs['properties'] = ('mandatory',)
|
|
options.append(option(**kwargs))
|
|
od = OptionDescription(uri,
|
|
message_def.response.description,
|
|
options,
|
|
)
|
|
od.impl_set_information('multi', message_def.response.multi)
|
|
return od
|
|
|
|
|
|
def _get_root_option(select_option,
|
|
optiondescriptions,
|
|
):
|
|
"""get root option
|
|
"""
|
|
def _get_od(curr_ods):
|
|
options = []
|
|
for name in curr_ods.keys():
|
|
if name is None:
|
|
description, curr_options = curr_ods[name]
|
|
options.extend(curr_ods[name][1])
|
|
else:
|
|
if None in curr_ods[name].keys():
|
|
description = curr_ods[name][None][0]
|
|
if description.endswith('.'):
|
|
description = description[:-1]
|
|
else:
|
|
description = None
|
|
od = OptionDescription(name,
|
|
description,
|
|
_get_od(curr_ods[name]))
|
|
if None in list(curr_ods[name].keys()):
|
|
od.impl_set_group_type(groups.message)
|
|
options.append(od)
|
|
return options
|
|
|
|
options_obj = [select_option]
|
|
struct_od = {}
|
|
for od_name, options_descr in optiondescriptions.items():
|
|
# od_name is something like config.configuration.server.get
|
|
curr_od = struct_od
|
|
for subod in od_name.split('.'):
|
|
curr_od.setdefault(subod, {})
|
|
curr_od = curr_od[subod]
|
|
# curr_od is now {'config': {'configuration': {server: {}}}}
|
|
curr_od[None] = options_descr
|
|
# curr_od is now {'config': {'configuration': {server: {None: options_descr}}}}
|
|
options_obj.extend(_get_od(struct_od))
|
|
return OptionDescription('root', 'root', options_obj)
|
|
|
|
|
|
def get_messages(current_module_names,
|
|
load_shortarg=False,
|
|
current_version=None,
|
|
uris=None,
|
|
):
|
|
"""generate description from yml files
|
|
"""
|
|
optiondescriptions = {}
|
|
optiondescriptions_info = {}
|
|
messages = list(list_messages(uris,
|
|
current_module_names,
|
|
current_version,
|
|
))
|
|
messages.sort()
|
|
# optiondescriptions_name = [message_name.split('.', 1)[1] for message_name in messages]
|
|
select_option = ChoiceOption('message',
|
|
'Nom du message.',
|
|
tuple(messages),
|
|
properties=frozenset(['mandatory', 'positional']))
|
|
for uri in messages:
|
|
message_def = get_message(uri,
|
|
current_module_names,
|
|
)
|
|
optiondescriptions_info[message_def.message] = {'pattern': message_def.pattern,
|
|
'default_roles': message_def.default_roles,
|
|
'version': message_def.version,
|
|
}
|
|
if message_def.pattern == 'rpc':
|
|
if not message_def.response:
|
|
raise Exception(f'rpc without response is not allowed {uri}')
|
|
optiondescriptions_info[message_def.message]['response'] = _parse_responses(message_def,
|
|
uri,
|
|
)
|
|
elif message_def.response:
|
|
raise Exception(f'response is not allowed for {uri}')
|
|
message_def.options = get_options(message_def,
|
|
uri,
|
|
select_option,
|
|
load_shortarg,
|
|
)
|
|
optiondescriptions[uri] = (message_def.description, message_def.options)
|
|
|
|
root = _get_root_option(select_option,
|
|
optiondescriptions,
|
|
)
|
|
return optiondescriptions_info, root
|
|
|
|
|
|
CUSTOMTYPES = load_customtypes()
|