risotto/src/risotto/message.py

640 lines
23 KiB
Python

from os import listdir
from os.path import join, basename, dirname, isfile
from glob import glob
from gettext import translation
try:
from tiramisu3 import StrOption, IntOption, BoolOption, ChoiceOption, OptionDescription, \
SymLinkOption, FloatOption, Calculation, Params, ParamOption, \
ParamValue, calc_value, calc_value_property_help, groups, Option
except:
from tiramisu import StrOption, IntOption, BoolOption, ChoiceOption, OptionDescription, \
SymLinkOption, FloatOption, Calculation, Params, ParamOption, \
ParamValue, calc_value, calc_value_property_help, groups, Option
from yaml import load, SafeLoader
from .config import get_config
from .utils import _
MESSAGE_ROOT_PATH = get_config()['global']['message_root_path']
groups.addgroup('message')
CUSTOMTYPES = None
MESSAGE_TRANSLATION = None
class DictOption(Option):
__slots__ = tuple()
_type = 'dict'
_display_name = _('dict')
def validate(self, value):
if not isinstance(value, dict):
raise ValueError()
class AnyOption(Option):
__slots__ = tuple()
_type = 'any value'
_display_name = _('any')
def validate(self, value):
pass
class MessageDefinition:
"""
A MessageDefinition is a representation of a message in the Zephir application messaging context
"""
__slots__ = ('version',
'message',
'description',
'parameters',
'default_roles',
'errors',
'pattern',
'related',
'response',
'options',
)
def __init__(self,
raw_def,
version,
message):
# default value for non mandatory key
self.version = version
self.parameters = {}
self.errors = []
self.related = []
self.default_roles = []
self.response = None
self.message = message
self.options = None
# loads yaml information into object
for key, value in raw_def.items():
if key == 'message':
raise Exception('message in not allowed in message')
if isinstance(value, str):
value = value.strip()
if key == 'pattern':
if value not in ['rpc', 'event']:
raise Exception(_('unknown pattern {}').format(value))
elif key == 'parameters':
if 'type' in value and isinstance(value['type'], str):
# should be a customtype
value = CUSTOMTYPES[self.version][value['type']].properties
else:
value = _parse_parameters(value,
self.version)
elif key == 'response':
value = ResponseDefinition(value,
self.version)
elif key == 'description':
value = value.strip().rstrip()
if value.endswith('.'):
value = value[:-1]
value = MESSAGE_TRANSLATION(value)
setattr(self, key, value)
# check mandatory keys
for key in self.__slots__:
try:
getattr(self, key)
except AttributeError:
raise Exception(_('mandatory key not set {} message').format(key))
class ParameterDefinition:
__slots__ = ('name',
'type',
'description',
'help',
'default',
'ref',
'shortarg')
def __init__(self,
name,
version,
raw_def):
self.name = name
# default value for non mandatory key
self.help = None
self.ref = None
self.shortarg = None
# loads yaml information into object
for key, value in raw_def.items():
if isinstance(value, str):
value = value.strip()
if key == 'type':
if value.startswith('[]'):
tvalue = value[2:]
else:
tvalue = value
if tvalue in CUSTOMTYPES[version]:
if value.startswith('[]'):
value = '[]{}'.format(CUSTOMTYPES[version][tvalue].type)
else:
value = CUSTOMTYPES[version][value].type
else:
self._valid_type(value)
#self._valid_type(value)
elif key == 'description':
if value.endswith('.'):
value = value[:-1]
value = MESSAGE_TRANSLATION(value)
setattr(self, key, value)
# check mandatory keys
for key in self.__slots__:
try:
getattr(self, key)
except AttributeError:
if key != 'default':
raise Exception(_('mandatory key not set "{}" in parameter').format(key))
def _valid_type(self, typ):
if typ.startswith('[]'):
self._valid_type(typ[2:])
elif typ not in ['Boolean', 'String', 'Number', 'File', 'Dict', 'Any', 'Float']:
raise Exception(_('unknown parameter type: {}').format(typ))
class ResponseDefinition:
"""
An ResponseDefinition is a representation of a response in the Zephir application messaging context
"""
__slots__ = ('description',
'type',
'ref',
'parameters',
'required',
'multi')
def __init__(self,
responses,
version):
self.ref = None
self.parameters = None
self.multi = False
self.required = []
for key, value in responses.items():
if key in ['parameters', 'required']:
raise Exception(_('parameters and required must be set with a custom type'))
elif key == 'type':
if value.startswith('[]'):
tvalue = value[2:]
self.multi = True
else:
tvalue = value
if tvalue in CUSTOMTYPES[version]:
self.parameters = CUSTOMTYPES[version][tvalue].properties
self.required = CUSTOMTYPES[version][tvalue].required
if value.startswith('[]'):
value = '[]{}'.format(CUSTOMTYPES[version][tvalue].type)
else:
value = CUSTOMTYPES[version][tvalue].type
self.description = CUSTOMTYPES[version][tvalue].description
else:
raise Exception('only customtype is supported in response')
elif key == 'description':
raise Exception('description is not allowed in response')
setattr(self, key, value)
# check mandatory keys
for key in self.__slots__:
try:
getattr(self, key)
except AttributeError:
raise Exception(_('mandatory key not set {}').format(key))
def _parse_parameters(raw_defs,
version):
parameters = {}
for name, raw_def in raw_defs.items():
parameters[name] = ParameterDefinition(name,
version,
raw_def)
return parameters
def get_message(uri: str,
current_module_names: str,
) -> MessageDefinition:
try:
version, message = uri.split('.', 1)
path = get_message_file_path(version,
message,
current_module_names)
with open(path, "r") as message_file:
return MessageDefinition(load(message_file.read(), Loader=SafeLoader),
version,
message)
except Exception as err:
import traceback
traceback.print_exc()
raise Exception(_(f'cannot parse message {uri}: {err}'))
def get_message_file_path(version,
message,
current_module_names):
module_name, filename = message.split('.', 1)
if current_module_names and module_name not in current_module_names:
raise Exception(f'should only load message for {current_module_names}, not {message}')
return join(MESSAGE_ROOT_PATH, version, module_name, 'messages', filename + '.yml')
def list_messages(uris,
current_module_names,
current_version,
):
def get_module_paths(current_module_names):
if current_module_names is None:
current_module_names = listdir(join(MESSAGE_ROOT_PATH, version))
for module_name in current_module_names:
yield module_name, join(MESSAGE_ROOT_PATH, version, module_name, 'messages')
if current_version:
versions = [current_version]
else:
versions = listdir(join(MESSAGE_ROOT_PATH))
versions.sort()
for version in versions:
for module_name, message_path in get_module_paths(current_module_names):
for message in listdir(message_path):
if message.endswith('.yml'):
uri = version + '.' + module_name + '.' + message.rsplit('.', 1)[0]
# if uris is not None, return only is in uris' list
if uris is not None and uri not in uris:
continue
yield uri
class CustomParam:
__slots__ = ('name',
'type',
'description',
'ref',
'default')
def __init__(self, name, raw_def, required):
self.name = name
self.ref = None
if name not in required:
self.default = None
# loads yaml information into object
for key, value in raw_def.items():
if isinstance(value, str):
value = value.strip()
if key == 'type':
value = self._convert_type(value, raw_def)
elif key == 'items':
continue
elif key == 'description':
if value.endswith('.'):
value = value[:-1]
value = MESSAGE_TRANSLATION(value)
setattr(self, key, value)
# check mandatory keys
for key in self.__slots__:
try:
getattr(self, key)
except AttributeError:
# default value for non mandatory key
if key != 'default':
raise Exception(_('mandatory key not set "{}" in parameter').format(key))
def _convert_type(self, typ, raw_def):
types = {'boolean': 'Boolean',
'string': 'String',
'number': 'Number',
'object': 'Dict',
'any': 'Any',
'array': 'Array',
'file': 'File',
'float': 'Float'}
if typ not in list(types.keys()):
# validate after
return typ
if typ == 'array' and 'items' in raw_def:
if not isinstance(raw_def['items'], dict):
raise Exception(_('items must be a dict'))
if list(raw_def['items'].keys()) != ['type']:
raise Exception(_('items must be a dict with only a type'))
return '[]{}'.format(self._convert_type(raw_def['items']['type'], raw_def))
return types[typ]
def _parse_custom_params(raw_defs, required):
parameters = {}
for name, raw_def in raw_defs.items():
parameters[name] = CustomParam(name, raw_def, required)
return parameters
class CustomType:
__slots__ = ('title',
'type',
'description',
'ref',
'properties',
'required')
def __init__(self, raw_def):
# default value for non mandatory key
self.ref = None
# loads yaml information into object
for key, value in raw_def.items():
if isinstance(value, str):
value = value.strip()
if key == 'type':
value = self._convert_type(value, raw_def)
elif key == 'properties':
value = _parse_custom_params(value, raw_def.get('required', {}))
elif key == 'description':
if value.endswith('.'):
value = value[:-1]
value = MESSAGE_TRANSLATION(value)
setattr(self, key, value)
# check mandatory keys
for key in self.__slots__:
try:
getattr(self, key)
except AttributeError:
raise Exception(_('mandatory key not set "{}" in parameter').format(key))
def _convert_type(self, typ, raw_def):
types = {'boolean': 'Boolean',
'string': 'String',
'number': 'Number',
'object': 'Dict',
'array': 'Array'}
if typ not in list(types.keys()):
# validate after
return typ
if typ == 'array' and 'items' in raw_def:
if not isinstance(raw_def['items'], dict):
raise Exception(_('items must be a dict'))
if list(raw_def['items'].keys()) != ['type']:
raise Exception(_('items must be a dict with only a type'))
return '[]{}'.format(self._convert_type(raw_def['items']['type'], raw_def))
return types[typ]
def getname(self):
return self.title
def load_customtypes() -> None:
versions = listdir(MESSAGE_ROOT_PATH)
versions.sort()
ret = {}
for version in versions:
if version not in ret:
ret[version] = {}
for current_module_name in listdir(join(MESSAGE_ROOT_PATH, version)):
types_path = join(MESSAGE_ROOT_PATH,
version,
current_module_name,
'types')
for message in listdir(types_path):
if message.endswith('.yml'):
path = join(types_path, message)
# remove extension
message = message.rsplit('.', 1)[0]
with open(path, "r") as message_file:
try:
custom_type = CustomType(load(message_file, Loader=SafeLoader))
ret[version][custom_type.getname()] = custom_type
except Exception as err:
raise Exception(_(f'enable to load type "{message}": {err}'))
return ret
def _get_description(description,
name):
# hack because some description are, in fact some help
if not '\n' in description and len(description) <= 150:
doc = description
else:
doc = name
if doc.endswith('.'):
doc= description[:-1]
doc = MESSAGE_TRANSLATION(doc)
return doc
def _get_option(name,
arg,
uri,
select_option,
):
"""generate option
"""
props = []
if not hasattr(arg, 'default'):
props.append('mandatory')
if select_option:
props.append(Calculation(calc_value,
Params(ParamValue('disabled'),
kwargs={'condition': ParamOption(select_option, todict=True),
'expected': ParamValue(uri),
'reverse_condition': ParamValue(True)}),
calc_value_property_help))
props.append('notunique')
description = arg.description.strip().rstrip()
kwargs = {'name': name,
'doc': _get_description(description, name),
'properties': frozenset(props),
#'multi': arg.multi,
}
if hasattr(arg, 'default'):
kwargs['default'] = arg.default
type_ = arg.type
if type_.startswith('[]'):
kwargs['multi'] = True
type_ = type_[2:]
if type_ == 'Dict':
obj = DictOption(**kwargs)
elif type_ == 'String':
obj = StrOption(**kwargs)
elif type_ == 'Any':
obj = AnyOption(**kwargs)
elif 'Number' in type_ or type_ == 'ID' or type_ == 'Integer':
obj = IntOption(**kwargs)
elif type_ == 'Boolean':
obj = BoolOption(**kwargs)
elif type_ == 'Float':
obj = FloatOption(**kwargs)
else:
raise Exception('unsupported type {} in {}'.format(type_, uri))
obj.impl_set_information('ref', arg.ref)
return obj
def get_options(message_def,
uri,
select_option,
load_shortarg,
):
"""build option with args/kwargs
"""
options =[]
for name, arg in message_def.parameters.items():
current_opt = _get_option(name,
arg,
uri,
select_option,
)
options.append(current_opt)
if hasattr(arg, 'shortarg') and arg.shortarg and load_shortarg:
options.append(SymLinkOption(arg.shortarg, current_opt))
return options
def _parse_responses(message_def,
uri,
):
"""build option with returns
"""
if message_def.response.parameters is None:
raise Exception(f'message "{message_def.message}" did not returned any valid parameters')
options = []
names = []
for name, obj in message_def.response.parameters.items():
if name in names:
raise Exception(f'multi response with name "{name}" in "{uri}"')
names.append(name)
kwargs = {'name': name,
'doc': obj.description.strip().rstrip()}
type_ = obj.type
if type_.startswith('[]'):
kwargs['multi'] = True
type_ = type_[2:]
option = {'String': StrOption,
'Number': IntOption,
'Boolean': BoolOption,
'Dict': DictOption,
'Any': AnyOption,
'Float': FloatOption,
# FIXME
'File': StrOption}.get(type_)
if not option:
raise Exception(f'unknown param type {obj.type} in responses of message {message_def.message}')
if hasattr(obj, 'default'):
kwargs['default'] = obj.default
kwargs['properties'] = ('notunique',)
else:
kwargs['properties'] = ('mandatory', 'notunique')
options.append(option(**kwargs))
od = OptionDescription(uri,
message_def.response.description,
options,
)
od.impl_set_information('multi', message_def.response.multi)
return od
def _get_root_option(select_option,
optiondescriptions,
):
"""get root option
"""
def _get_od(curr_ods):
options = []
for name in curr_ods.keys():
if name is None:
description, curr_options = curr_ods[name]
options.extend(curr_ods[name][1])
else:
if None in curr_ods[name].keys():
description = curr_ods[name][None][0]
if description.endswith('.'):
description = description[:-1]
else:
description = None
od = OptionDescription(name,
description,
_get_od(curr_ods[name]))
if None in list(curr_ods[name].keys()):
od.impl_set_group_type(groups.message)
options.append(od)
return options
options_obj = [select_option]
struct_od = {}
for od_name, options_descr in optiondescriptions.items():
# od_name is something like config.configuration.server.get
curr_od = struct_od
for subod in od_name.split('.'):
curr_od.setdefault(subod, {})
curr_od = curr_od[subod]
# curr_od is now {'config': {'configuration': {server: {}}}}
curr_od[None] = options_descr
# curr_od is now {'config': {'configuration': {server: {None: options_descr}}}}
options_obj.extend(_get_od(struct_od))
return OptionDescription('root', 'root', options_obj)
def get_messages(current_module_names,
load_shortarg=False,
current_version=None,
uris=None,
):
"""generate description from yml files
"""
global MESSAGE_TRANSLATION, CUSTOMTYPES
if MESSAGE_TRANSLATION is None:
MESSAGE_TRANSLATION = translation('risotto-message', join(MESSAGE_ROOT_PATH, '..', 'locale')).gettext
if CUSTOMTYPES is None:
CUSTOMTYPES = load_customtypes()
optiondescriptions = {}
optiondescriptions_info = {}
messages = list(list_messages(uris,
current_module_names,
current_version,
))
messages.sort()
# optiondescriptions_name = [message_name.split('.', 1)[1] for message_name in messages]
select_option = ChoiceOption('message',
'Nom du message.',
tuple(messages),
properties=frozenset(['mandatory', 'positional', 'notunique']))
for uri in messages:
message_def = get_message(uri,
current_module_names,
)
optiondescriptions_info[message_def.message] = {'pattern': message_def.pattern,
'default_roles': message_def.default_roles,
'version': message_def.version,
}
if message_def.pattern == 'rpc':
if not message_def.response:
raise Exception(f'rpc without response is not allowed {uri}')
optiondescriptions_info[message_def.message]['response'] = _parse_responses(message_def,
uri,
)
elif message_def.response:
raise Exception(f'response is not allowed for {uri}')
message_def.options = get_options(message_def,
uri,
select_option,
load_shortarg,
)
optiondescriptions[uri] = (message_def.description, message_def.options)
root = _get_root_option(select_option,
optiondescriptions,
)
return optiondescriptions_info, root