"""loader flattened XML specific """ from os.path import join, isfile from lxml.etree import DTD import imp from tiramisu import (StrOption, OptionDescription, DynOptionDescription, PortOption, IntOption, ChoiceOption, BoolOption, SymLinkOption, IPOption, NetworkOption, NetmaskOption, DomainnameOption, BroadcastOption, URLOption, EmailOption, FilenameOption, UsernameOption, DateOption, PasswordOption, BoolOption, MACOption, Leadership, submulti, Params, ParamSelfOption, ParamOption, ParamDynOption, ParamValue, Calculation, calc_value) from .config import dtdfilename from .i18n import _ from .utils import normalize_family from .annotator import VARIABLE_NAMESPACE from .error import CreoleLoaderError FUNC_TO_DICT = ['valid_not_equal'] KNOWN_TAGS = ['family', 'variable', 'separators', 'leader', 'property'] class ConvertDynOptionDescription(DynOptionDescription): def convert_suffix_to_path(self, suffix): if not isinstance(suffix, str): suffix = str(suffix) return normalize_family(suffix, check_name=False) def convert_tiramisu_value(value, obj): """ convertit les variables dans le bon type si nécessaire """ def _convert_boolean(value): prop = {'True': True, 'False': False, 'None': None} if value not in prop: raise Exception('unknown value {} while trying to cast {} to boolean'.format(value, obj)) return prop[value] if value is None: return value func = {IntOption: int, BoolOption: _convert_boolean}.get(obj, None) if func is None: return value if isinstance(value, list): return [func(val) for val in value] else: return func(value) CONVERT_OPTION = {'number': dict(opttype=IntOption), 'choice': dict(opttype=ChoiceOption), 'string': dict(opttype=StrOption), 'password': dict(opttype=PasswordOption), 'mail': dict(opttype=EmailOption), 'boolean': dict(opttype=BoolOption), 'symlink': dict(opttype=SymLinkOption), 'filename': dict(opttype=FilenameOption), 'date': dict(opttype=DateOption), 'unix_user': dict(opttype=UsernameOption), 'ip': dict(opttype=IPOption, initkwargs={'allow_reserved': True}), 'local_ip': dict(opttype=IPOption, initkwargs={'private_only': True, 'warnings_only': True}), 'netmask': dict(opttype=NetmaskOption), 'network': dict(opttype=NetworkOption), 'broadcast': dict(opttype=BroadcastOption), 'netbios': dict(opttype=DomainnameOption, initkwargs={'type': 'netbios', 'warnings_only': True}), 'domain': dict(opttype=DomainnameOption, initkwargs={'type': 'domainname', 'allow_ip': True, 'allow_without_dot': True}), 'domain_strict': dict(opttype=DomainnameOption, initkwargs={'type': 'domainname', 'allow_ip': False}), 'hostname': dict(opttype=DomainnameOption, initkwargs={'type': 'hostname', 'allow_ip': True}), 'hostname_strict': dict(opttype=DomainnameOption, initkwargs={'type': 'hostname', 'allow_ip': False}), 'web_address': dict(opttype=URLOption, initkwargs={'allow_ip': True, 'allow_without_dot': True}), 'port': dict(opttype=PortOption, initkwargs={'allow_private': True}), 'mac': dict(opttype=MACOption), 'cidr': dict(opttype=IPOption, initkwargs={'cidr': True}), 'network_cidr': dict(opttype=NetworkOption, initkwargs={'cidr': True}), } class Elt: def __init__(self, attrib): self.attrib = attrib class PopulateTiramisuObjects: def __init__(self): self.storage = ElementStorage() self.booleans = [] def parse_dtd(self, dtdfilename): """Loads the Creole DTD :raises IOError: if the DTD is not found :param dtdfilename: the full filename of the Creole DTD """ if not isfile(dtdfilename): raise IOError(_("no such DTD file: {}").format(dtdfilename)) with open(dtdfilename, 'r') as dtdfd: dtd = DTD(dtdfd) for elt in dtd.iterelements(): if elt.name == 'variable': for attr in elt.iterattributes(): if set(attr.itervalues()) == set(['True', 'False']): self.booleans.append(attr.name) def get_root_family(self): family = Family(Elt({'name': 'baseoption', 'doc': 'baseoption'}), self.booleans, self.storage, self.eosfunc, ) self.storage.add('.', family) return family def reorder_family(self, xmlroot): xmlelts = [] for xmlelt in xmlroot: # VARIABLE_NAMESPACE family has to be loaded before any other family # because `extra` family could use `VARIABLE_NAMESPACE` variables. if xmlelt.attrib['name'] == VARIABLE_NAMESPACE: xmlelts.insert(0, xmlelt) else: xmlelts.append(xmlelt) return xmlelts def make_tiramisu_objects(self, xmlroot, eosfunc): self.eosfunc = imp.load_source('eosfunc', eosfunc) family = self.get_root_family() for xmlelt in self.reorder_family(xmlroot): self.iter_family(xmlelt, family, None, ) def iter_family(self, child, family, subpath, ): if child.tag not in KNOWN_TAGS: raise CreoleLoaderError(_('unknown tag {}').format(child.tag)) if child.tag in ['family', 'leader']: self.populate_family(family, child, subpath, ) elif child.tag == 'separators': self.parse_separators(child) elif child.tag == 'variable': self.populate_variable(child, subpath, family) elif child.tag == 'property': self.parse_properties(family, child) else: raise Exception('unknown tag {}'.format(child.tag)) def populate_family(self, parent_family, elt, subpath, ): path = self.build_path(subpath, elt, ) family = Family(elt, self.booleans, self.storage, self.eosfunc, ) self.storage.add(path, family) if elt.tag == 'leader': family.set_leader() parent_family.add(family) if len(elt) != 0: for child in elt: self.iter_family(child, family, path, ) def parse_separators(self, separators, ): for separator in separators: elt = self.storage.get(separator.attrib['name']) elt.add_information('separator', separator.text) def populate_variable(self, elt, subpath, family): is_follower = False is_leader = False if family.is_leader: if elt.attrib['name'] != family.attrib['name']: is_follower = True else: is_leader = True path = self.build_path(subpath, elt, ) variable = Variable(elt, self.booleans, self.storage, is_follower, is_leader, self.eosfunc, ) self.storage.add(path, variable) family.add(variable) def parse_properties(self, family, child): if child.get('type') == 'calculation': kwargs = {'condition': child.attrib['source'], 'expected': ParamValue(child.attrib.get('expected'))} if child.attrib['inverse'] == 'True': kwargs['reverse_condition'] = ParamValue(True) family.attrib['properties'].append((ParamValue(child.text), kwargs)) else: family.attrib['properties'].append(child.text) def build_path(self, subpath, elt, ): if subpath is None: return elt.attrib['name'] return subpath + '.' + elt.attrib['name'] class ElementStorage: def __init__(self): self.paths = {} def add(self, path, elt): if path in self.paths: raise CreoleLoaderError(_('path already loaded {}').format(path)) self.paths[path] = elt def get(self, path): if path not in self.paths: raise CreoleLoaderError(_('there is no element for path {}').format(path)) return self.paths[path] class Common: def build_properties(self): for index, prop in enumerate(self.attrib['properties']): if isinstance(prop, tuple): action, kwargs = prop kwargs['condition'] = ParamOption(self.storage.get(kwargs['condition']).get(), todict=True) prop = Calculation(calc_value, Params(action, kwargs=kwargs)) self.attrib['properties'][index] = prop if self.attrib['properties']: self.attrib['properties'] = tuple(self.attrib['properties']) else: del self.attrib['properties'] class Variable(Common): def __init__(self, elt, booleans, storage, is_follower, is_leader, eosfunc): self.option = None self.informations = {} self.attrib = {} if elt.attrib['type'] != 'symlink': self.attrib['properties'] = [] self.attrib['validators'] = [] self.eosfunc = eosfunc self.storage = storage self.booleans = booleans self.populate_attrib(elt) self.is_follower = is_follower self.is_leader = is_leader convert_option = CONVERT_OPTION[elt.attrib['type']] self.object_type = convert_option['opttype'] self.populate_choice(elt) for child in elt: self.populate_property(child) self.populate_value(child) self.populate_check(child) if 'initkwargs' in convert_option: self.attrib.update(convert_option['initkwargs']) if elt.attrib['type'] == 'symlink': self.attrib['opt'] = storage.get(self.attrib['opt']) def populate_attrib(self, elt, ): for key, value in elt.attrib.items(): if key == 'multi' and elt.attrib['type'] == 'symlink': continue if key == 'multi' and value == 'submulti': value = submulti elif key in self.booleans: if value == 'True': value = True elif value == 'False': value = False else: raise CreoleLoaderError(_('unknown value {} for {}').format(value, key)) if key in ['help', 'test']: self.add_information(key, value) elif key != 'type': self.attrib[key] = value def populate_choice(self, elt, ): if elt.attrib['type'] == 'choice': values = [] for child in elt: if child.tag == 'choice': value = child.text if child.attrib['type'] == 'number': value = int(value) values.append(value) self.attrib['values'] = tuple(values) def populate_property(self, child): if child.tag == 'property': if child.get('type') == 'calculation': kwargs = {'condition': child.attrib['source'], 'expected': ParamValue(child.attrib.get('expected'))} if child.attrib['inverse'] == 'True': kwargs['reverse_condition'] = ParamValue(True) self.attrib['properties'].append((ParamValue(child.text), kwargs)) else: self.attrib['properties'].append(child.text) def populate_value(self, child): if child.tag == 'value': if child.attrib.get('type') == 'calculation': if child.text is not None and child.text.strip(): self.attrib['default'] = (child.text.strip(),) else: params = [] for param in child: params.append(self.parse_param(param)) self.attrib['default'] = (child.attrib['name'], params, False) else: if "type" in child.attrib: type_ = CONVERT_OPTION[child.attrib['type']]['opttype'] else: type_ = self.object_type if self.attrib['multi'] is True and not self.is_follower: if 'default' not in self.attrib: self.attrib['default'] = [] value = convert_tiramisu_value(child.text, type_) self.attrib['default'].append(value) if 'default_multi' not in self.attrib and not self.is_leader: self.attrib['default_multi'] = value elif self.attrib['multi'] == submulti: if 'default' not in self.attrib: self.attrib['default'] = [] value = convert_tiramisu_value(child.text, type_) if not self.is_follower: if not isinstance(value, list): dvalue = [value] else: dvalue = value self.attrib['default'].append(dvalue) if value and 'default_multi' not in self.attrib and not self.is_leader: self.attrib['default_multi'] = [] if not self.is_leader: self.attrib['default_multi'].append(value) else: if 'default' in self.attrib: raise CreoleLoaderError(_('default value already set for {}' '').format(self.attrib['path'])) value = convert_tiramisu_value(child.text, type_) if self.is_follower: self.attrib['default_multi'] = value else: self.attrib['default'] = value def populate_check(self, child): if child.tag == 'check': params = [] for param in child: params.append(self.parse_param(param)) #check.params = params self.attrib['validators'].append((child.attrib['name'], params, child.attrib['warnings_only'])) def parse_param(self, param): name = param.attrib.get('name', '') if param.attrib['type'] == 'string': value = param.text elif param.attrib['type'] == 'variable': transitive = param.attrib.get('transitive', 'False') if transitive == 'True': transitive = True elif transitive == 'False': transitive = False else: raise CreoleLoaderError(_('unknown transitive boolean {}').format(transitive)) value = [param.text, transitive, param.attrib.get('suffix')] elif param.attrib['type'] == 'number': value = int(param.text) else: raise CreoleLoaderError(_('unknown param type {}').format(param.attrib['type'])) return(name, value) def add_information(self, key, value): if key in self.informations: raise CreoleLoaderError(_('key already exists in information {}').format(key)) self.informations[key] = value def build_calculator(self, key): if key in self.attrib: values = self.attrib[key] if isinstance(values, list): is_list = True else: is_list = False values = [values] ret = [] for value in values: if isinstance(value, tuple): if key == 'validators': args = [ParamSelfOption()] else: args = [] kwargs = {} if len(value) == 3: for param in value[1]: if isinstance(param[1], list): option = self.storage.get(param[1][0]).get() param_kwargs = {'notraisepropertyerror': param[1][1]} if value[0] in FUNC_TO_DICT: param_kwargs['todict'] = True if not param[1][2]: param_value = ParamOption(option, **param_kwargs, ) else: family = '.'.join(param[1][0].split('.', 3)[:2]) param_value = ParamDynOption(option, param[1][2], self.storage.get(family).get(), **param_kwargs, ) else: param_value = ParamValue(param[1]) if not param[0]: args.append(param_value) else: kwargs[param[0]] = param_value ret.append(Calculation(getattr(self.eosfunc, value[0]), Params(tuple(args), kwargs=kwargs))) else: ret.append(value) if not is_list: self.attrib[key] = ret[0] else: self.attrib[key] = ret def get(self): if self.option is None: if self.object_type is SymLinkOption: self.attrib['opt'] = self.attrib['opt'].get() else: self.build_properties() self.build_calculator('default') self.build_calculator('validators') if not self.attrib['validators']: del self.attrib['validators'] try: option = self.object_type(**self.attrib) except Exception as err: import traceback traceback.print_exc() name = self.attrib['name'] raise CreoleLoaderError(_('cannot create option "{}": {}').format(name, err)) for key, value in self.informations.items(): option.impl_set_information(key, value) self.option = option return self.option class Family(Common): def __init__(self, elt, booleans, storage, eosfunc, ): self.option = None self.attrib = {} self.is_leader = False self.informations = {} self.children = [] self.storage = storage self.eosfunc = eosfunc self.attrib['properties'] = [] for key, value in elt.attrib.items(): if key == 'help': self.add_information(key, value) else: self.attrib[key] = value def add(self, child): self.children.append(child) def add_information(self, key, value): if key in self.informations and not (key == 'icon' and self.informations[key] is None): raise CreoleLoaderError(_('key already exists in information {}').format(key)) self.informations[key] = value def set_leader(self): self.is_leader = True def get(self): if self.option is None: self.attrib['children'] = [] for child in self.children: self.attrib['children'].append(child.get()) self.build_properties() try: if 'dynamic' in self.attrib: dynamic = self.storage.get(self.attrib['dynamic']).get() del self.attrib['dynamic'] self.attrib['suffixes'] = Calculation(self.eosfunc.calc_value, Params((ParamOption(dynamic),))) option = ConvertDynOptionDescription(**self.attrib) elif not self.is_leader: option = OptionDescription(**self.attrib) else: option = Leadership(**self.attrib) except Exception as err: print(self.attrib) raise CreoleLoaderError(_('cannot create optiondescription "{}": {}').format(self.attrib['name'], err)) for key, value in self.informations.items(): option.impl_set_information(key, value) self.option = option return self.option def load(xmlroot: str, dtd_path: str, funcs_path: str): tiramisu_objects = PopulateTiramisuObjects() tiramisu_objects.parse_dtd(dtd_path) tiramisu_objects.make_tiramisu_objects(xmlroot, funcs_path) return tiramisu_objects.storage.paths['.'].get()