from collections import OrderedDict from os.path import join, basename, dirname from glob import glob from tiramisu import StrOption, IntOption, BoolOption, ChoiceOption, OptionDescription, SymLinkOption, \ Config, Calculation, Params, ParamOption, ParamValue, calc_value, calc_value_property_help, \ groups, Option from yaml import load, SafeLoader from os import listdir from os.path import isfile from ..config import MESSAGE_ROOT_PATH from ..utils import _ groups.addgroup('message') class DictOption(Option): __slots__ = tuple() _type = 'dict' _display_name = _('dict') def validate(self, value): if not isinstance(value, dict): raise ValueError() class AnyOption(Option): __slots__ = tuple() _type = 'any value' _display_name = _('any') def validate(self, value): pass class MessageDefinition: """ A MessageDefinition is a representation of a message in the Zephir application messaging context """ __slots__ = ('version', 'uri', 'description', 'sampleuse', 'domain', 'parameters', 'public', 'errors', 'pattern', 'related', 'response') def __init__(self, raw_def): # default value for non mandatory key self.version = u'' self.parameters = OrderedDict() self.public = False self.errors = [] self.related = [] self.response = None self.sampleuse = None # loads yaml information into object for key, value in raw_def.items(): if isinstance(value, str): value = value.strip() if key == 'public': if not isinstance(value, bool): raise ValueError(_("{} must be a boolean, not {}").format(key, value)) elif key == 'pattern': if value not in ['rpc', 'event', 'error']: raise Exception(_('unknown pattern {}').format(value)) elif key == 'parameters': if 'type' in value and isinstance(value['type'], str): # should be a customtype value = customtypes[value['type']].properties else: value = _parse_parameters(value) elif key == 'response': value = ResponseDefinition(value) elif key == 'errors': value = _parse_error_definition(value) setattr(self, key, value) # check mandatory keys for key in self.__slots__: try: getattr(self, key) except AttributeError: raise Exception(_('mandatory key not set {} message').format(key)) # message with pattern = error must be public if self.public is False and self.pattern == 'error': raise Exception(_('Error message must be public : {}').format(self.uri)) class ParameterDefinition: __slots__ = ('name', 'type', 'description', 'help', 'default', 'ref', 'shortarg') def __init__(self, name, raw_def): self.name = name # default value for non mandatory key self.help = None self.ref = None self.shortarg = None # loads yaml information into object for key, value in raw_def.items(): if isinstance(value, str): value = value.strip() if key == 'type': if value.startswith('[]'): tvalue = value[2:] else: tvalue = value if tvalue in customtypes: if value.startswith('[]'): value = '[]{}'.format(customtypes[tvalue].type) else: value = customtypes[value].type else: self._valid_type(value) #self._valid_type(value) setattr(self, key, value) # check mandatory keys for key in self.__slots__: try: getattr(self, key) except AttributeError: if key != 'default': raise Exception(_('mandatory key not set "{}" in parameter').format(key)) def _valid_type(self, typ): if typ.startswith('[]'): self._valid_type(typ[2:]) elif typ not in ['Boolean', 'String', 'Number', 'File', 'Dict', 'Any']: raise Exception(_('unknown parameter type: {}').format(typ)) class ResponseDefinition: """ An ResponseDefinition is a representation of a response in the Zephir application messaging context """ __slots__ = ('description', 'type', 'ref', 'parameters', 'required', 'multi') def __init__(self, responses): self.ref = None self.parameters = None self.multi = False self.required = [] for key, value in responses.items(): if key in ['parameters', 'required']: raise Exception(_('parameters and required must be set with a custom type')) elif key == 'type': if value.startswith('[]'): tvalue = value[2:] self.multi = True else: tvalue = value if tvalue in customtypes: self.parameters = customtypes[tvalue].properties self.required = customtypes[tvalue].required if value.startswith('[]'): value = '[]{}'.format(customtypes[tvalue].type) else: value = customtypes[value].type else: self._valid_type(value) setattr(self, key, value) # check mandatory keys for key in self.__slots__: try: getattr(self, key) except AttributeError: raise Exception(_('mandatory key not set {}').format(key)) def _valid_type(self, typ): if typ.startswith('[]'): self._valid_type(typ[2:]) elif typ not in ['Boolean', 'String', 'Number', 'File', 'Dict']: raise Exception(_('unknown parameter type: {}').format(typ)) class ErrorDefinition: """ An ErrorDefinition is a representation of an error in the Zephir application messaging context """ __slots__ = ('uri',) def __init__(self, raw_err): extra_keys = set(raw_err) - set(self.__slots__) if extra_keys: raise Exception(_('extra keys for errors: {}').format(extra_keys)) self.uri = raw_err['uri'] def _parse_error_definition(raw_defs): new_value = [] for raw_err in raw_defs: new_value.append(ErrorDefinition(raw_err)) return new_value def _parse_parameters(raw_defs): parameters = OrderedDict() for name, raw_def in raw_defs.items(): parameters[name] = ParameterDefinition(name, raw_def) return parameters def parse_definition(filename: str): return MessageDefinition(load(filename, Loader=SafeLoader)) def is_message_defined(uri): version, message = split_message_uri(uri) path = get_message_file_path(version, message) return isfile(path) def get_message(uri): load_customtypes() try: version, message = split_message_uri(uri) path = get_message_file_path(version, message) with open(path, "r") as message_file: message_content = parse_definition(message_file.read()) message_content.version = version return message_content except Exception as err: import traceback traceback.print_exc() raise Exception(_('cannot parse message {}: {}').format(uri, str(err))) def split_message_uri(uri): return uri.split('.', 1) def get_message_file_path(version, message): return join(MESSAGE_ROOT_PATH, version, 'messages', message + '.yml') def list_messages(): messages = listdir(MESSAGE_ROOT_PATH) messages.sort() for version in messages: for message in listdir(join(MESSAGE_ROOT_PATH, version, 'messages')): if message.endswith('.yml'): yield version + '.' + message.rsplit('.', 1)[0] class CustomParam: __slots__ = ('name', 'type', 'description', 'ref', 'default') def __init__(self, name, raw_def, required): self.name = name self.ref = None if name not in required: self.default = None # loads yaml information into object for key, value in raw_def.items(): if isinstance(value, str): value = value.strip() if key == 'type': value = self._convert_type(value, raw_def) elif key == 'items': continue setattr(self, key, value) # check mandatory keys for key in self.__slots__: try: getattr(self, key) except AttributeError: # default value for non mandatory key if key != 'default': raise Exception(_('mandatory key not set "{}" in parameter').format(key)) def _convert_type(self, typ, raw_def): types = {'boolean': 'Boolean', 'string': 'String', 'number': 'Number', 'object': 'Dict', 'array': 'Array', 'file': 'File'} if typ not in list(types.keys()): # validate after return typ if typ == 'array' and 'items' in raw_def: if not isinstance(raw_def['items'], dict): raise Exception(_('items must be a dict')) if list(raw_def['items'].keys()) != ['type']: raise Exception(_('items must be a dict with only a type')) return '[]{}'.format(self._convert_type(raw_def['items']['type'], raw_def)) return types[typ] def _parse_custom_params(raw_defs, required): parameters = OrderedDict() for name, raw_def in raw_defs.items(): parameters[name] = CustomParam(name, raw_def, required) return parameters class CustomType: __slots__ = ('title', 'type', 'description', 'ref', 'properties', 'required') def __init__(self, raw_def): # default value for non mandatory key self.ref = None # loads yaml information into object for key, value in raw_def.items(): if isinstance(value, str): value = value.strip() if key == 'type': value = self._convert_type(value, raw_def) elif key == 'properties': value = _parse_custom_params(value, raw_def.get('required', {})) setattr(self, key, value) # check mandatory keys for key in self.__slots__: try: getattr(self, key) except AttributeError: raise Exception(_('mandatory key not set "{}" in parameter').format(key)) def _convert_type(self, typ, raw_def): types = {'boolean': 'Boolean', 'string': 'String', 'number': 'Number', 'object': 'Dict', 'array': 'Array'} if typ not in list(types.keys()): # validate after return typ if typ == 'array' and 'items' in raw_def: if not isinstance(raw_def['items'], dict): raise Exception(_('items must be a dict')) if list(raw_def['items'].keys()) != ['type']: raise Exception(_('items must be a dict with only a type')) return '[]{}'.format(self._convert_type(raw_def['items']['type'], raw_def)) return types[typ] def getname(self): return self.title customtypes = {} def load_customtypes(): if not customtypes: versions = listdir(MESSAGE_ROOT_PATH) versions.sort() for version in versions: for message in listdir(join(MESSAGE_ROOT_PATH, version, 'types')): if message.endswith('.yml'): path = join(MESSAGE_ROOT_PATH, version, 'types', message) message = message.rsplit('.', 1)[0] with open(path, "r") as message_file: try: ret = CustomType(load(message_file, Loader=SafeLoader)) customtypes[ret.getname()] = ret except Exception as err: import traceback traceback.print_exc() raise Exception('{} for {}'.format(err, message)) for customtype in customtypes.values(): properties = {} for key, value in customtype.properties.items(): type_ = value.type if type_.startswith('[]'): ttype_ = type_[2:] else: ttype_ = type_ if ttype_ in customtypes: if type_.startswith('[]'): raise Exception(_('cannot have []CustomType')) properties[key] = customtypes[ttype_] else: properties[key] = value customtype.properties = properties def _get_description(description, name): # hack because some description are, in fact some help if not '\n' in description and len(description) <= 150: doc = description else: doc = name if doc.endswith('.'): doc= description[:-1] return doc def _get_option(name, arg, file_path, select_option, optiondescription): """generate option """ props = [] if not hasattr(arg, 'default'): props.append('mandatory') props.append(Calculation(calc_value, Params(ParamValue('disabled'), kwargs={'condition': ParamOption(select_option, todict=True), 'expected': ParamValue(optiondescription), 'reverse_condition': ParamValue(True)}), calc_value_property_help)) description = arg.description.strip().rstrip() kwargs = {'name': name, 'doc': _get_description(description, name), 'properties': frozenset(props), #'multi': arg.multi, } if hasattr(arg, 'default'): kwargs['default'] = arg.default type_ = arg.type if type_.startswith('[]'): kwargs['multi'] = True type_ = type_[2:] if type_ == 'Dict': return DictOption(**kwargs) elif type_ == 'String': return StrOption(**kwargs) elif type_ == 'Any': return AnyOption(**kwargs) elif 'Number' in type_ or type_ == 'ID' or type_ == 'Integer': return IntOption(**kwargs) elif type_ == 'Boolean': return BoolOption(**kwargs) raise Exception('unsupported type {} in {}'.format(type_, file_path)) def _parse_args(message_def, options, file_path, needs, select_option, optiondescription, load_shortarg): """build option with args/kwargs """ new_options = OrderedDict() for name, arg in message_def.parameters.items(): new_options[name] = arg if arg.ref: needs.setdefault(message_def.uri, {}).setdefault(arg.ref, []).append(name) for name, arg in new_options.items(): current_opt = _get_option(name, arg, file_path, select_option, optiondescription) options.append(current_opt) if hasattr(arg, 'shortarg') and arg.shortarg and load_shortarg: options.append(SymLinkOption(arg.shortarg, current_opt)) def _parse_responses(message_def, file_path): """build option with returns """ if message_def.response.parameters is None: raise Exception('not implemented yet') #name = 'response' #keys['']['columns'][name] = {'description': message_def.response.description, # 'type': message_def.response.type} #responses = {} #responses['keys'] = keys #return responses options = [] names = [] for name, obj in message_def.response.parameters.items(): if name in names: raise Exception('multi response with name {} in {}'.format(name, file_path)) names.append(name) kwargs = {'name': name, 'doc': obj.description.strip().rstrip()} type_ = obj.type if type_.startswith('[]'): kwargs['multi'] = True type_ = type_[2:] option = {'String': StrOption, 'Number': IntOption, 'Boolean': BoolOption, 'Dict': DictOption, # FIXME 'File': StrOption}.get(type_) if not option: raise Exception(f'unknown param type {obj.type}') if hasattr(obj, 'default'): kwargs['default'] = obj.default else: kwargs['properties'] = ('mandatory',) options.append(option(**kwargs)) od = OptionDescription(message_def.uri, message_def.response.description, options) od.impl_set_information('multi', message_def.response.multi) return od def _getoptions_from_yml(message_def, version, optiondescriptions, file_path, needs, select_option, load_shortarg): if message_def.pattern == 'event' and message_def.response: raise Exception('event with response?: {}'.format(file_path)) if message_def.pattern == 'rpc' and not message_def.response: print('rpc without response?: {}'.format(file_path)) options = [] # options = [StrOption('version', # 'version', # version, # properties=frozenset(['hidden']))] _parse_args(message_def, options, file_path, needs, select_option, message_def.uri, load_shortarg) name = message_def.uri description = message_def.description.strip().rstrip() optiondescriptions[name] = (description, options) def _get_root_option(select_option, optiondescriptions): """get root option """ def _get_od(curr_ods): options = [] for name in curr_ods.keys(): if name is None: description, curr_options = curr_ods[name] options.extend(curr_ods[name][1]) else: if None in curr_ods[name].keys(): description = curr_ods[name][None][0] if description.endswith('.'): description = description[:-1] else: description = None od = OptionDescription(name, description, _get_od(curr_ods[name])) if None in list(curr_ods[name].keys()): od.impl_set_group_type(groups.message) options.append(od) return options options_obj = [select_option] struct_od = {} for od_name, options_descr in optiondescriptions.items(): # od_name is something like config.configuration.server.get curr_od = struct_od for subod in od_name.split('.'): curr_od.setdefault(subod, {}) curr_od = curr_od[subod] # curr_od is now {'config': {'configuration': {server: {}}}} curr_od[None] = options_descr # curr_od is now {'config': {'configuration': {server: {None: options_descr}}}} options_obj.extend(_get_od(struct_od)) return OptionDescription('root', 'root', options_obj) def get_messages(load_shortarg=False, only_public=False): """generate description from yml files """ optiondescriptions = OrderedDict() optiondescriptions_name = [] optiondescriptions_info = {} needs = OrderedDict() messages = list(list_messages()) messages.sort() for message_name in messages: message_def = get_message(message_name) if message_def.pattern not in ['rpc', 'event'] or \ (not message_def.public and only_public): continue optiondescriptions_name.append(message_def.uri) optiondescriptions_name.sort() select_option = ChoiceOption('message', 'Nom du message.', tuple(optiondescriptions_name), properties=frozenset(['mandatory', 'positional'])) for message_name in messages: message_def = get_message(message_name) if message_def.pattern not in ['rpc', 'event'] or \ (not message_def.public and only_public): continue optiondescriptions_info[message_def.uri] = {'pattern': message_def.pattern, 'public': message_def.public} if message_def.pattern == 'rpc': optiondescriptions_info[message_def.uri]['response'] = _parse_responses(message_def, message_name) elif message_def.response: raise Exception(f'response not allowed for {message_def.uri}') version = message_name.split('.')[0] _getoptions_from_yml(message_def, version, optiondescriptions, message_name, needs, select_option, load_shortarg) root = _get_root_option(select_option, optiondescriptions) try: config = Config(root) except Exception as err: raise Exception('error when generating root optiondescription: {}'.format(err)) config.property.read_write() # config.property.add('demoting_error_warning') # return needs, responses, config return optiondescriptions_info, root