""" Creole flattener. Takes a bunch of Creole XML dispatched in differents folders as an input and outputs a human readable flatened XML Sample usage:: >>> from creole.objspace import CreoleObjSpace >>> eolobj = CreoleObjSpace('/usr/share/creole/creole.dtd') >>> eolobj.create_or_populate_from_xml('creole', ['/usr/share/eole/creole/dicos']) >>> eolobj.space_visitor() >>> eolobj.save('/tmp/creole_flatened_output.xml') The CreoleObjSpace - loads the XML into an internal CreoleObjSpace representation - visits/annotates the objects - dumps the object space as XML output into a single XML target The visit/annotation stage is a complex step that corresponds to the Creole procedures. For example: a variable is redefined and shall be moved to another family means that a variable1 = Variable() object in the object space who lives in the family1 parent has to be moved in family2. The visit procedure changes the varable1's object space's parent. """ from collections import OrderedDict from lxml.etree import Element, SubElement # pylint: disable=E0611 from json import dump from .i18n import _ from .xmlreflector import XMLReflector, HIGH_COMPATIBILITY from .annotator import ERASED_ATTRIBUTES, ActionAnnotator, ServiceAnnotator, SpaceAnnotator from .utils import normalize_family from .error import CreoleOperationError, SpaceObjShallNotBeUpdated, CreoleDictConsistencyError # CreoleObjSpace's elements like 'family' or 'slave', that shall be forced to the Redefinable type FORCE_REDEFINABLES = ('family', 'slave', 'service', 'disknod', 'variables', 'family_action') # CreoleObjSpace's elements that shall be forced to the UnRedefinable type FORCE_UNREDEFINABLES = ('value', 'input', 'profile', 'ewtapp', 'tag', 'saltaction') # CreoleObjSpace's elements that shall be set to the UnRedefinable type UNREDEFINABLE = ('submulti', 'multi', 'type') PROPERTIES = ('hidden', 'frozen', 'auto_freeze', 'auto_save', 'force_default_on_freeze', 'force_store_value', 'disabled', 'mandatory') CONVERT_PROPERTIES = {'auto_save': ['force_store_value'], 'auto_freeze': ['force_store_value', 'auto_freeze']} RENAME_ATTIBUTES = {'description': 'doc'} INCOMPATIBLE_ATTRIBUTES = [['multi', 'submulti']] #TYPE_TARGET_CONDITION = ('variable', 'family') # _____________________________________________________________________________ # special types definitions for the Object Space's internal representation class RootCreoleObject(object): "" class CreoleObjSpace(object): """DOM XML reflexion free internal representation of a Creole Dictionary """ choice = type('Choice', (RootCreoleObject,), OrderedDict()) property_ = type('Property', (RootCreoleObject,), OrderedDict()) # Creole ObjectSpace's Leadership variable class type Leadership = type('Leadership', (RootCreoleObject,), OrderedDict()) """ This Atom type stands for singleton, that is an Object Space's atom object is present only once in the object space's tree """ Atom = type('Atom', (RootCreoleObject,), OrderedDict()) "A variable that can't be redefined" Redefinable = type('Redefinable', (RootCreoleObject,), OrderedDict()) "A variable can be redefined" UnRedefinable = type('UnRedefinable', (RootCreoleObject,), OrderedDict()) def __init__(self, dtdfilename): # pylint: disable=R0912 self.index = 0 class ObjSpace(object): # pylint: disable=R0903 """ Base object space """ self.space = ObjSpace() self.xmlreflector = XMLReflector() self.xmlreflector.parse_dtd(dtdfilename) self.redefine_variables = None self.probe_variables = [] # ['variable', 'separator', 'family'] self.forced_text_elts = set() self.forced_text_elts_as_name = set(['choice', 'property']) self.forced_choice_option = {} self.paths = Path() self.list_conditions = {} self.booleans_attributs = [] for elt in self.xmlreflector.dtd.iterelements(): attrs = {} clstype = self.UnRedefinable atomic = True forced_text_elt = False if elt.type == 'mixed': forced_text_elt = True if elt.name == 'service': self.parse_dtd_right_left_elt(elt.content) for attr in elt.iterattributes(): atomic = False if attr.default_value: if attr.default_value == 'True': default_value = True elif attr.default_value == 'False': default_value = False else: default_value = attr.default_value attrs[attr.name] = default_value if not attr.name.endswith('_type'): values = list(attr.itervalues()) if values != []: self.forced_choice_option.setdefault(elt.name, {})[attr.name] = values if attr.name == 'redefine': clstype = self.Redefinable if attr.name == 'name' and forced_text_elt is True: self.forced_text_elts.add(elt.name) forced_text_elt = False if set(attr.itervalues()) == set(['True', 'False']): self.booleans_attributs.append(attr.name) if forced_text_elt is True: self.forced_text_elts_as_name.add(elt.name) if elt.name in FORCE_REDEFINABLES: clstype = self.Redefinable elif elt.name in FORCE_UNREDEFINABLES: clstype = self.UnRedefinable elif atomic: clstype = self.Atom # Creole ObjectSpace class types, it enables us to create objects like: # Service_restriction(), Ip(), Interface(), Host(), Fstab(), Package(), Disknod(), # File(), Variables(), Family(), Variable(), Separators(), Separator(), Value(), # Constraints()... and so on. Creole ObjectSpace is an object's reflexion of # the XML elements setattr(self, elt.name, type(elt.name.capitalize(), (clstype,), attrs)) def parse_dtd_right_left_elt(self, elt): if elt.right.type == 'or': self.parse_dtd_right_left_elt(elt.right) def _convert_boolean(self, value): # pylint: disable=R0201 """Boolean coercion. The Creole XML may contain srings like `True` or `False` """ if isinstance(value, bool): return value if value == 'True': return True elif value == 'False': return False else: raise TypeError(_('{} is not True or False').format(value)) # pragma: no cover def _is_already_exists(self, name, space, child, namespace): if isinstance(space, self.family): # pylint: disable=E1101 if namespace != 'creole': name = space.path + '.' + name return self.paths.path_is_defined(name) if child.tag in ['family', 'family_action']: norm_name = normalize_family(name) else: norm_name = name return norm_name in getattr(space, child.tag, {}) def _translate_in_space(self, name, family, variable, namespace): if not isinstance(family, self.family): # pylint: disable=E1101 if variable.tag in ['family', 'family_action']: norm_name = normalize_family(name) else: norm_name = name return getattr(family, variable.tag)[norm_name] if namespace == 'creole': path = name else: path = family.path + '.' + name old_family_name = self.paths.get_variable_family_name(path) if normalize_family(family.name) == old_family_name: return getattr(family, variable.tag)[name] old_family = self.space.variables['creole'].family[old_family_name] # pylint: disable=E1101 variable_obj = old_family.variable[name] del old_family.variable[name] if 'variable' not in vars(family): family.variable = OrderedDict() family.variable[name] = variable_obj self.paths.add_variable(namespace, name, family.name, variable_obj, ) return variable_obj def remove_check(self, name): # pylint: disable=C0111 if hasattr(self.space, 'constraints') and hasattr(self.space.constraints, 'check'): remove_checks = [] for idx, check in enumerate(self.space.constraints.check): # pylint: disable=E1101 if hasattr(check, 'target') and check.target == name: remove_checks.append(idx) remove_checks = list(set(remove_checks)) remove_checks.sort(reverse=True) for idx in remove_checks: self.space.constraints.check.pop(idx) # pylint: disable=E1101 def remove_condition(self, name): # pylint: disable=C0111 for idx, condition in enumerate(self.space.constraints.condition): # pylint: disable=E1101 remove_targets = [] if hasattr(condition, 'target'): for target_idx, target in enumerate(condition.target): if target.name == name: remove_targets.append(target_idx) remove_targets = list(set(remove_targets)) remove_targets.sort(reverse=True) for idx in remove_targets: del condition.target[idx] def create_or_update_space_object(self, subspace, space, child, namespace): """Creates or retrieves the space object that corresponds to the `child` XML object Two attributes of the `child` XML object are important: - with the `redefine` boolean flag attribute we know whether the corresponding space object shall be created or updated - `True` means that the corresponding space object shall be updated - `False` means that the corresponding space object shall be created - with the `exists` boolean flag attribute we know whether the corresponding space object shall be created (or nothing -- that is the space object isn't modified) - `True` means that the corresponding space object shall be created - `False` means that the corresponding space object is not updated In the special case `redefine` is True and `exists` is False, we create the corresponding space object if it doesn't exist and we update it if it exists. :return: the corresponding space object of the `child` XML object """ if child.tag in self.forced_text_elts_as_name: name = child.text else: name = subspace['name'] if self._is_already_exists(name, space, child, namespace): if child.tag in FORCE_REDEFINABLES: redefine = self._convert_boolean(subspace.get('redefine', True)) else: redefine = self._convert_boolean(subspace.get('redefine', False)) exists = self._convert_boolean(subspace.get('exists', True)) if redefine is True: return self._translate_in_space(name, space, child, namespace) elif exists is False: raise SpaceObjShallNotBeUpdated() else: raise CreoleDictConsistencyError(_('Already present in another XML file, {} ' 'cannot be re-created').format(name)) else: redefine = self._convert_boolean(subspace.get('redefine', False)) exists = self._convert_boolean(subspace.get('exists', False)) if redefine is False or exists is True: return getattr(self, child.tag)() else: raise CreoleDictConsistencyError(_('Redefined object: ' '{} does not exist yet').format(name)) def generate_creoleobj(self, child, space, namespace): """ instanciates or creates Creole Object Subspace objects """ if issubclass(getattr(self, child.tag), self.Redefinable): creoleobj = self.create_or_update_space_object(child.attrib, space, child, namespace) else: # instanciates an object from the CreoleObjSpace's builtins types # example : child.tag = constraints -> a self.Constraints() object is created creoleobj = getattr(self, child.tag)() # this Atom instance has to be a singleton here # we do not re-create it, we reuse it if isinstance(creoleobj, self.Atom) and child.tag in vars(space): creoleobj = getattr(space, child.tag) self.create_tree_structure(space, child, creoleobj) return creoleobj def create_tree_structure(self, space, child, creoleobj): # pylint: disable=R0201 """ Builds the tree structure of the object space here we set services attributes in order to be populated later on for example:: space = Family() space.variable = OrderedDict() another example: space = Variable() space.value = list() """ if child.tag not in vars(space): if isinstance(creoleobj, self.Redefinable): setattr(space, child.tag, OrderedDict()) elif isinstance(creoleobj, self.UnRedefinable): setattr(space, child.tag, []) elif isinstance(creoleobj, self.Atom): pass else: # pragma: no cover raise CreoleOperationError(_("Creole object {} " "has a wrong type").format(type(creoleobj))) def _add_to_tree_structure(self, creoleobj, space, child): # pylint: disable=R0201 if isinstance(creoleobj, self.Redefinable): name = creoleobj.name if child.tag == 'family' or child.tag == 'family_action': name = normalize_family(name) getattr(space, child.tag)[name] = creoleobj elif isinstance(creoleobj, self.UnRedefinable): getattr(space, child.tag).append(creoleobj) else: setattr(space, child.tag, creoleobj) def _set_text_to_obj(self, child, creoleobj): if child.text is None: text = None else: text = child.text.strip() if text: if child.tag in self.forced_text_elts_as_name: creoleobj.name = text else: creoleobj.text = text def _set_xml_attributes_to_obj(self, child, creoleobj): redefine = self._convert_boolean(child.attrib.get('redefine', False)) has_value = hasattr(creoleobj, 'value') if HIGH_COMPATIBILITY and has_value: has_value = len(child) != 1 or child[0].text != None if (redefine is True and child.tag == 'variable' and has_value and len(child) != 0): del creoleobj.value for attr, val in child.attrib.items(): if redefine and attr in UNREDEFINABLE: # UNREDEFINABLE concerns only 'variable' node so we can fix name # to child.attrib['name'] name = child.attrib['name'] raise CreoleDictConsistencyError(_("cannot redefine attribute {} for variable {}").format(attr, name)) if isinstance(getattr(creoleobj, attr, None), bool): if val == 'False': val = False elif val == 'True': val = True else: # pragma: no cover raise CreoleOperationError(_('value for {} must be True or False, ' 'not {}').format(attr, val)) if not (attr == 'name' and getattr(creoleobj, 'name', None) != None): setattr(creoleobj, attr, val) keys = list(vars(creoleobj).keys()) for incompatible in INCOMPATIBLE_ATTRIBUTES: found = False for inc in incompatible: if inc in keys: if found: raise CreoleDictConsistencyError(_('those attributes are incompatible {}').format(incompatible)) found = True def _creoleobj_tree_visitor(self, child, creoleobj, namespace): """Creole object tree manipulations """ if child.tag == 'variable' and child.attrib.get('remove_check', False): self.remove_check(creoleobj.name) if child.tag == 'variable' and child.attrib.get('remove_condition', False): self.remove_condition(creoleobj.name) if child.tag in ['auto', 'fill', 'check']: variable_name = child.attrib['target'] # XXX not working with variable not in creole and in leader/followers if variable_name in self.redefine_variables: creoleobj.redefine = True else: creoleobj.redefine = False if not hasattr(creoleobj, 'index'): creoleobj.index = self.index if child.tag in ['auto', 'fill', 'condition', 'check', 'action']: creoleobj.namespace = namespace def xml_parse_document(self, document, space, namespace, is_in_family=False): """Parses a Creole XML file populates the CreoleObjSpace """ family_names = [] for child in document: # this index enables us to reorder the 'fill' and 'auto' objects self.index += 1 # doesn't proceed the XML commentaries if not isinstance(child.tag, str): continue if child.tag == 'family': is_in_family = True if child.attrib['name'] in family_names: raise CreoleDictConsistencyError(_('Family {} is set several times').format(child.attrib['name'])) family_names.append(child.attrib['name']) if child.tag == 'variables': child.attrib['name'] = namespace if HIGH_COMPATIBILITY and child.tag == 'value' and child.text == None: continue # creole objects creation try: creoleobj = self.generate_creoleobj(child, space, namespace) except SpaceObjShallNotBeUpdated: continue self._set_text_to_obj(child, creoleobj) self._set_xml_attributes_to_obj(child, creoleobj) self._creoleobj_tree_visitor(child, creoleobj, namespace) self._fill_creoleobj_path_attribute(space, child, namespace, document, creoleobj) self._add_to_tree_structure(creoleobj, space, child) if list(child) != []: self.xml_parse_document(child, creoleobj, namespace, is_in_family) def _fill_creoleobj_path_attribute(self, space, child, namespace, document, creoleobj): # pylint: disable=R0913 """Fill self.paths attributes """ if not isinstance(space, self.help): # pylint: disable=E1101 if child.tag == 'variable': family_name = normalize_family(document.attrib['name']) self.paths.add_variable(namespace, child.attrib['name'], family_name, creoleobj) if child.attrib.get('redefine', 'False') == 'True': if namespace == 'creole': self.redefine_variables.append(child.attrib['name']) else: self.redefine_variables.append(namespace + '.' + family_name + '.' + child.attrib['name']) if child.tag == 'family': family_name = normalize_family(child.attrib['name']) if namespace != 'creole': family_name = namespace + '.' + family_name self.paths.add_family(namespace, family_name, creoleobj, ) creoleobj.path = self.paths.get_family_path(family_name, namespace) def create_or_populate_from_xml(self, namespace, xmlfolders, from_zephir=None): """Parses a bunch of XML files populates the CreoleObjSpace """ documents = self.xmlreflector.load_xml_from_folders(xmlfolders, from_zephir) for xmlfile, document in documents: try: self.redefine_variables = [] self.xml_parse_document(document, self.space, namespace) except Exception as err: #print(_('error in XML file {}').format(xmlfile)) raise err def populate_from_zephir(self, namespace, xmlfile): self.redefine_variables = [] document = self.xmlreflector.parse_xmlfile(xmlfile, from_zephir=True, zephir2=True) self.xml_parse_document(document, self.space, namespace) def space_visitor(self, eosfunc_file): # pylint: disable=C0111 ActionAnnotator(self) ServiceAnnotator(self) SpaceAnnotator(self, eosfunc_file) def save(self, filename, force_no_save=False): """Save an XML output on disk :param filename: the full XML filename """ xml = Element('creole') self._xml_export(xml, self.space) if not force_no_save: self.xmlreflector.save_xmlfile(filename, xml) return xml def save_probes(self, filename, force_no_save=False): """Save an XML output on disk :param filename: the full XML filename """ ret = {} for variable in self.probe_variables: args = [] kwargs = {} if hasattr(variable, 'param'): for param in variable.param: list_param = list(vars(param).keys()) if 'index' in list_param: list_param.remove('index') if list_param == ['text']: args.append(param.text) elif list_param == ['text', 'name']: kwargs[param.name] = param.text else: print(vars(param)) raise Exception('hu?') ret[variable.target] = {'function': variable.name, 'args': args, 'kwargs': kwargs} if not force_no_save: with open(filename, 'w') as fhj: dump(ret, fhj) return ret def _get_attributes(self, space): # pylint: disable=R0201 for attr in dir(space): if not attr.startswith('_'): yield attr def _sub_xml_export(self, name, node, node_name, space, current_space): if isinstance(space, dict): space = list(space.values()) if isinstance(space, list): for subspace in space: if isinstance(subspace, self.Leadership): _name = 'leader' else: _name = name if name in ['services', 'variables', 'actions']: _name = 'family' if HIGH_COMPATIBILITY and not hasattr(subspace, 'doc'): subspace.doc = '' if _name == 'value' and (not hasattr(subspace, 'name') or subspace.name is None): continue child_node = SubElement(node, _name) self._xml_export(child_node, subspace, _name) elif isinstance(space, self.Atom): if name == 'services': child_node = SubElement(node, 'family') child_node.attrib['name'] = name else: child_node = SubElement(node, name) for subname in self._get_attributes(space): subspace = getattr(space, subname) self._sub_xml_export(subname, child_node, name, subspace, space) elif isinstance(space, self.Redefinable): child_node = SubElement(node, 'family') child_node.attrib['name'] = name for subname in self._get_attributes(space): subspace = getattr(space, subname) self._sub_xml_export(subname, child_node, name, subspace, space) else: # FIXME plutot dans annotator ... if name in PROPERTIES and node.tag in ['variable', 'family', 'leader']: if space is True: for prop in CONVERT_PROPERTIES.get(name, [name]): SubElement(node, 'property').text = prop elif name not in ERASED_ATTRIBUTES: if name == 'name' and node_name in self.forced_text_elts_as_name and not hasattr(current_space, 'param'): if isinstance(space, str): node.text = space else: node.text = str(space) elif name == 'text' and node_name in self.forced_text_elts: node.text = space elif node.tag == 'family' and name == 'name': if 'doc' not in node.attrib.keys(): node.attrib['doc'] = space node.attrib['name'] = normalize_family(space, check_name=False) elif node.tag in ['variable', 'family', 'leader'] and name == 'mode': if space is not None: SubElement(node, 'property').text = space else: if name in RENAME_ATTIBUTES: name = RENAME_ATTIBUTES[name] if space is not None: node.attrib[name] = str(space) def _xml_export(self, node, space, node_name='creole'): for name in self._get_attributes(space): subspace = getattr(space, name) self._sub_xml_export(name, node, node_name, subspace, space) class Path: """Helper class to handle the `path` attribute of a CreoleObjSpace instance. sample: path="creole.general.condition" """ def __init__(self): self.variables = {} self.families = {} # Family def add_family(self, namespace: str, name: str, creoleobj: str, ) -> str: # pylint: disable=C0111 self.families[name] = dict(name=name, namespace=namespace, creoleobj=creoleobj, ) def get_family_path(self, name: str, current_namespace: str, ) -> str: # pylint: disable=C0111 if current_namespace is None: # pragma: no cover raise CreoleOperationError('current_namespace must not be None') dico = self.families[normalize_family(name, check_name=False, allow_dot=True, )] if dico['namespace'] != 'creole' and current_namespace != dico['namespace']: raise CreoleDictConsistencyError(_('A family located in the {} namespace ' 'shall not be used in the {} namespace').format( dico['namespace'], current_namespace)) path = dico['name'] if dico['namespace'] is not None and '.' not in dico['name']: path = '.'.join([dico['namespace'], path]) return path def get_family_namespace(self, name: str, ) -> str: # pylint: disable=C0111 dico = self.families[name] if dico['namespace'] is None: return dico['name'] return dico['namespace'] def get_family_obj(self, name: str, ) -> 'Family': # pylint: disable=C0111 if name not in self.families: raise CreoleDictConsistencyError(_('unknown family {}').format(name)) dico = self.families[name] return dico['creoleobj'] # Leadership def set_leader(self, namespace: str, leader_family_name: str, name: str, leader_name: str, ) -> None: # pylint: disable=C0111 if namespace != 'creole': # need rebuild path and move object in new path old_path = namespace + '.' + leader_family_name + '.' + name dico = self._get_variable(old_path) del self.variables[old_path] new_path = namespace + '.' + leader_family_name + '.' + leader_name + '.' + name self.add_variable(namespace, new_path, family=dico['family'], creoleobj=dico['creoleobj']) name = new_path dico = self._get_variable(name) if dico['leader'] != None: raise CreoleDictConsistencyError(_('Already defined leader {} for variable' ' {}'.format(dico['leader'], name))) dico['leader'] = leader_name def get_leader(self, name): # pylint: disable=C0111 return self._get_variable(name)['leader'] # Variable def add_variable(self, namespace: str, name: str, family: str=None, creoleobj=None, ) -> str: # pylint: disable=C0111 if namespace == 'creole' or '.' in name: varname = name else: varname = '.'.join([namespace, family, name]) self.variables[varname] = dict(name=name, family=family, namespace=namespace, leader=None, creoleobj=creoleobj) def get_variable_name(self, name, ): # pylint: disable=C0111 return self._get_variable(name)['name'] def get_variable_obj(self, name:str, ) -> 'Variable': # pylint: disable=C0111 return self._get_variable(name)['creoleobj'] def get_variable_family_name(self, name: str, ) -> str: # pylint: disable=C0111 return self._get_variable(name)['family'] def get_variable_family_path(self, name: str, ) -> str: # pylint: disable=C0111 dico = self._get_variable(name) list_path = [dico['namespace'], dico['family']] if dico['leader'] is not None: list_path.append(dico['leader']) return '.'.join(list_path) def get_variable_namespace(self, name: str, ) -> str: # pylint: disable=C0111 return self._get_variable(name)['namespace'] def get_variable_path(self, name: str, current_namespace: str, allow_source: str=False, ) -> str: # pylint: disable=C0111 if current_namespace is None: # pragma: no cover raise CreoleOperationError('current_namespace must not be None') dico = self._get_variable(name) if not allow_source: if dico['namespace'] not in ['creole', 'services'] and current_namespace != dico['namespace']: raise CreoleDictConsistencyError(_('A variable located in the {} namespace ' 'shall not be used in the {} namespace').format( dico['namespace'], current_namespace)) if '.' in dico['name']: return dico['name'] list_path = [dico['namespace'], dico['family']] if dico['leader'] is not None: list_path.append(dico['leader']) list_path.append(dico['name']) return '.'.join(list_path) def path_is_defined(self, name: str, ) -> str: # pylint: disable=C0111 return name in self.variables def _get_variable(self, name: str, ) -> str: if name not in self.variables: if name.startswith('creole.'): name = name.split('.')[-1] if name not in self.variables: raise CreoleDictConsistencyError(_('unknown option {}').format(name)) return self.variables[name]