""" Creole flattener. Takes a bunch of Creole XML dispatched in differents folders as an input and outputs a human readable flatened XML Sample usage:: >>> from creole.objspace import CreoleObjSpace >>> eolobj = CreoleObjSpace('/usr/share/creole/creole.dtd') >>> eolobj.create_or_populate_from_xml('creole', ['/usr/share/eole/creole/dicos']) >>> eolobj.space_visitor() >>> eolobj.save('/tmp/creole_flatened_output.xml') The CreoleObjSpace - loads the XML into an internal CreoleObjSpace representation - visits/annotates the objects - dumps the object space as XML output into a single XML target The visit/annotation stage is a complex step that corresponds to the Creole procedures. For example: a variable is redefined and shall be moved to another family means that a variable1 = Variable() object in the object space who lives in the family1 parent has to be moved in family2. The visit procedure changes the varable1's object space's parent. """ from collections import OrderedDict from lxml.etree import Element, SubElement # pylint: disable=E0611 from .i18n import _ from .xmlreflector import XMLReflector, HIGH_COMPATIBILITY from .annotator import ERASED_ATTRIBUTES, ServiceAnnotator, SpaceAnnotator from .utils import normalize_family from .error import CreoleOperationError, SpaceObjShallNotBeUpdated, CreoleDictConsistencyError from .path import Path # CreoleObjSpace's elements like 'family' or 'follower', that shall be forced to the Redefinable type FORCE_REDEFINABLES = ('family', 'follower', 'service', 'disknod', 'variables') # CreoleObjSpace's elements that shall be forced to the UnRedefinable type FORCE_UNREDEFINABLES = ('value', 'input', 'profile', 'ewtapp', 'tag', 'saltaction') # CreoleObjSpace's elements that shall be set to the UnRedefinable type UNREDEFINABLE = ('submulti', 'multi', 'type') PROPERTIES = ('hidden', 'frozen', 'auto_freeze', 'auto_save', 'force_default_on_freeze', 'force_store_value', 'disabled', 'mandatory') CONVERT_PROPERTIES = {'auto_save': ['force_store_value'], 'auto_freeze': ['force_store_value', 'auto_freeze']} RENAME_ATTIBUTES = {'description': 'doc'} INCOMPATIBLE_ATTRIBUTES = [['multi', 'submulti']] FORCED_TEXT_ELTS_AS_NAME = ('choice', 'property') #TYPE_TARGET_CONDITION = ('variable', 'family') # _____________________________________________________________________________ # special types definitions for the Object Space's internal representation class RootCreoleObject: "" class CreoleObjSpace: """DOM XML reflexion free internal representation of a Creole Dictionary """ choice = type('Choice', (RootCreoleObject,), OrderedDict()) property_ = type('Property', (RootCreoleObject,), OrderedDict()) # Creole ObjectSpace's Leadership variable class type Leadership = type('Leadership', (RootCreoleObject,), OrderedDict()) """ This Atom type stands for singleton, that is an Object Space's atom object is present only once in the object space's tree """ Atom = type('Atom', (RootCreoleObject,), OrderedDict()) "A variable that can't be redefined" Redefinable = type('Redefinable', (RootCreoleObject,), OrderedDict()) "A variable can be redefined" UnRedefinable = type('UnRedefinable', (RootCreoleObject,), OrderedDict()) def __init__(self, dtdfilename): # pylint: disable=R0912 self.index = 0 class ObjSpace: # pylint: disable=R0903 """ Base object space """ self.space = ObjSpace() self.paths = Path() self.xmlreflector = XMLReflector() self.xmlreflector.parse_dtd(dtdfilename) self.redefine_variables = None # ['variable', 'separator', 'family'] self.forced_text_elts = set() self.forced_text_elts_as_name = set(FORCED_TEXT_ELTS_AS_NAME) self.list_conditions = {} self.booleans_attributs = [] self.make_object_space_class() def make_object_space_class(self): """Create Rougail ObjectSpace class types, it enables us to create objects like: File(), Variable(), Ip(), Family(), Constraints()... and so on. Creole ObjectSpace is an object's reflexion of the XML elements""" for dtd_elt in self.xmlreflector.dtd.iterelements(): attrs = {} if dtd_elt.name in FORCE_REDEFINABLES: clstype = self.Redefinable else: clstype = self.UnRedefinable atomic = dtd_elt.name not in FORCE_UNREDEFINABLES and dtd_elt.name not in FORCE_REDEFINABLES forced_text_elt = dtd_elt.type == 'mixed' for dtd_attr in dtd_elt.iterattributes(): atomic = False if set(dtd_attr.itervalues()) == set(['True', 'False']): # it's a boolean self.booleans_attributs.append(dtd_attr.name) if dtd_attr.default_value: # set default value for this attribute default_value = dtd_attr.default_value if dtd_attr.name in self.booleans_attributs: default_value = self.convert_boolean(dtd_attr.default_value) attrs[dtd_attr.name] = default_value if dtd_attr.name == 'redefine': # has a redefine attribute, so it's a Redefinable object clstype = self.Redefinable if dtd_attr.name == 'name' and forced_text_elt: # child.text should be transform has a "name" attribute self.forced_text_elts.add(dtd_elt.name) forced_text_elt = False if forced_text_elt is True: self.forced_text_elts_as_name.add(dtd_elt.name) if atomic: # has any attribute so it's an Atomic object clstype = self.Atom # create ObjectSpace object setattr(self, dtd_elt.name, type(dtd_elt.name.capitalize(), (clstype,), attrs)) def create_or_populate_from_xml(self, namespace, xmlfolders): """Parses a bunch of XML files populates the CreoleObjSpace """ for xmlfile, document in self.xmlreflector.load_xml_from_folders(xmlfolders): self.redefine_variables = [] self.xml_parse_document(document, self.space, namespace, ) def xml_parse_document(self, document, space, namespace, ): """Parses a Creole XML file populates the CreoleObjSpace """ family_names = [] for child in document: # this index enables us to reorder objects self.index += 1 # doesn't proceed the XML commentaries if not isinstance(child.tag, str): continue if child.tag == 'family': if child.attrib['name'] in family_names: raise CreoleDictConsistencyError(_('Family {} is set several times').format(child.attrib['name'])) family_names.append(child.attrib['name']) if child.tag == 'variables': child.attrib['name'] = namespace if HIGH_COMPATIBILITY and child.tag == 'value' and child.text == None: # FIXME should not be here continue # creole objects creation try: creoleobj = self.generate_creoleobj(child, space, namespace, ) except SpaceObjShallNotBeUpdated: continue self.set_text_to_obj(child, creoleobj, ) self.set_xml_attributes_to_obj(child, creoleobj, ) self.creoleobj_tree_visitor(child, creoleobj, namespace, ) self.fill_creoleobj_path_attribute(space, child, namespace, document, creoleobj, ) self.add_to_tree_structure(creoleobj, space, child, ) if list(child) != []: self.xml_parse_document(child, creoleobj, namespace, ) def generate_creoleobj(self, child, space, namespace, ): """ instanciates or creates Creole Object Subspace objects """ creoleobj = getattr(self, child.tag)() if isinstance(creoleobj, self.Redefinable): creoleobj = self.create_or_update_redefinable_object(child.attrib, space, child, namespace, ) elif isinstance(creoleobj, self.Atom) and child.tag in vars(space): # instanciates an object from the CreoleObjSpace's builtins types # example : child.tag = constraints -> a self.Constraints() object is created # this Atom instance has to be a singleton here # we do not re-create it, we reuse it creoleobj = getattr(space, child.tag) self.create_tree_structure(space, child, creoleobj, ) return creoleobj def create_or_update_redefinable_object(self, subspace, space, child, namespace, ): """Creates or retrieves the space object that corresponds to the `child` XML object Two attributes of the `child` XML object are important: - with the `redefine` boolean flag attribute we know whether the corresponding space object shall be created or updated - `True` means that the corresponding space object shall be updated - `False` means that the corresponding space object shall be created - with the `exists` boolean flag attribute we know whether the corresponding space object shall be created (or nothing -- that is the space object isn't modified) - `True` means that the corresponding space object shall be created - `False` means that the corresponding space object is not updated In the special case `redefine` is True and `exists` is False, we create the corresponding space object if it doesn't exist and we update it if it exists. :return: the corresponding space object of the `child` XML object """ if child.tag in self.forced_text_elts_as_name: name = child.text else: name = subspace['name'] if self.is_already_exists(name, space, child, namespace, ): default_redefine = child.tag in FORCE_REDEFINABLES redefine = self.convert_boolean(subspace.get('redefine', default_redefine)) exists = self.convert_boolean(subspace.get('exists', True)) if redefine is True: return self.translate_in_space(name, space, child, namespace, ) elif exists is False: raise SpaceObjShallNotBeUpdated() raise CreoleDictConsistencyError(_(f'Already present in another XML file, {name} cannot be re-created')) redefine = self.convert_boolean(subspace.get('redefine', False)) exists = self.convert_boolean(subspace.get('exists', False)) if redefine is False or exists is True: return getattr(self, child.tag)() raise CreoleDictConsistencyError(_(f'Redefined object: {name} does not exist yet')) def create_tree_structure(self, space, child, creoleobj, ): # pylint: disable=R0201 """ Builds the tree structure of the object space here we set services attributes in order to be populated later on for example:: space = Family() space.variable = OrderedDict() another example: space = Variable() space.value = list() """ if child.tag not in vars(space): if isinstance(creoleobj, self.Redefinable): setattr(space, child.tag, OrderedDict()) elif isinstance(creoleobj, self.UnRedefinable): setattr(space, child.tag, []) elif not isinstance(creoleobj, self.Atom): # pragma: no cover raise CreoleOperationError(_("Creole object {} " "has a wrong type").format(type(creoleobj))) def is_already_exists(self, name, space, child, namespace): if isinstance(space, self.family): # pylint: disable=E1101 if namespace != 'creole': name = space.path + '.' + name return self.paths.path_is_defined(name) if child.tag == 'family': norm_name = normalize_family(name) else: norm_name = name return norm_name in getattr(space, child.tag, {}) def convert_boolean(self, value): # pylint: disable=R0201 """Boolean coercion. The Creole XML may contain srings like `True` or `False` """ if isinstance(value, bool): return value if value == 'True': return True elif value == 'False': return False else: raise TypeError(_('{} is not True or False').format(value)) # pragma: no cover def translate_in_space(self, name, family, variable, namespace, ): if not isinstance(family, self.family): # pylint: disable=E1101 if variable.tag == 'family': norm_name = normalize_family(name) else: norm_name = name return getattr(family, variable.tag)[norm_name] if namespace == 'creole': path = name else: path = family.path + '.' + name old_family_name = self.paths.get_variable_family_name(path) if normalize_family(family.name) == old_family_name: return getattr(family, variable.tag)[name] old_family = self.space.variables['creole'].family[old_family_name] # pylint: disable=E1101 variable_obj = old_family.variable[name] del old_family.variable[name] if 'variable' not in vars(family): family.variable = OrderedDict() family.variable[name] = variable_obj self.paths.add_variable(namespace, name, family.name, False, variable_obj, ) return variable_obj def remove_check(self, name): # pylint: disable=C0111 if hasattr(self.space, 'constraints') and hasattr(self.space.constraints, 'check'): remove_checks = [] for idx, check in enumerate(self.space.constraints.check): # pylint: disable=E1101 if hasattr(check, 'target') and check.target == name: remove_checks.append(idx) remove_checks = list(set(remove_checks)) remove_checks.sort(reverse=True) for idx in remove_checks: self.space.constraints.check.pop(idx) # pylint: disable=E1101 def remove_condition(self, name): # pylint: disable=C0111 for idx, condition in enumerate(self.space.constraints.condition): # pylint: disable=E1101 remove_targets = [] if hasattr(condition, 'target'): for target_idx, target in enumerate(condition.target): if target.name == name: remove_targets.append(target_idx) remove_targets = list(set(remove_targets)) remove_targets.sort(reverse=True) for idx in remove_targets: del condition.target[idx] def add_to_tree_structure(self, creoleobj, space, child, ): # pylint: disable=R0201 if isinstance(creoleobj, self.Redefinable): name = creoleobj.name if child.tag == 'family': name = normalize_family(name) getattr(space, child.tag)[name] = creoleobj elif isinstance(creoleobj, self.UnRedefinable): getattr(space, child.tag).append(creoleobj) else: setattr(space, child.tag, creoleobj) def set_text_to_obj(self, child, creoleobj, ): if child.text is None: text = None else: text = child.text.strip() if text: if child.tag in self.forced_text_elts_as_name: creoleobj.name = text else: creoleobj.text = text def set_xml_attributes_to_obj(self, child, creoleobj, ): redefine = self.convert_boolean(child.attrib.get('redefine', False)) has_value = hasattr(creoleobj, 'value') if HIGH_COMPATIBILITY and has_value: has_value = len(child) != 1 or child[0].text != None if redefine is True and child.tag == 'variable' and has_value and len(child) != 0: del creoleobj.value for attr, val in child.attrib.items(): if redefine and attr in UNREDEFINABLE: # UNREDEFINABLE concerns only 'variable' node so we can fix name # to child.attrib['name'] name = child.attrib['name'] raise CreoleDictConsistencyError(_(f'cannot redefine attribute {attr} for variable {name}')) if attr in self.booleans_attributs: val = self.convert_boolean(val) if not (attr == 'name' and getattr(creoleobj, 'name', None) != None): setattr(creoleobj, attr, val) keys = list(vars(creoleobj).keys()) for incompatible in INCOMPATIBLE_ATTRIBUTES: found = False for inc in incompatible: if inc in keys: if found: raise CreoleDictConsistencyError(_('those attributes are incompatible {}').format(incompatible)) found = True def creoleobj_tree_visitor(self, child, creoleobj, namespace, ): """Creole object tree manipulations """ if child.tag == 'variable': if child.attrib.get('remove_check', False): self.remove_check(creoleobj.name) if child.attrib.get('remove_condition', False): self.remove_condition(creoleobj.name) if child.tag in ['fill', 'check']: # if variable is a redefine in current dictionary # XXX not working with variable not in creole and in leader/followers creoleobj.redefine = child.attrib['target'] in self.redefine_variables if not hasattr(creoleobj, 'index'): creoleobj.index = self.index if child.tag in ['fill', 'condition', 'check', 'action']: creoleobj.namespace = namespace def fill_creoleobj_path_attribute(self, space, child, namespace, document, creoleobj, ): # pylint: disable=R0913 """Fill self.paths attributes """ if isinstance(space, self.help): # pylint: disable=E1101 return if child.tag == 'variable': family_name = normalize_family(document.attrib['name']) self.paths.add_variable(namespace, child.attrib['name'], family_name, document.attrib.get('dynamic') != None, creoleobj) if child.attrib.get('redefine', 'False') == 'True': if namespace == 'creole': self.redefine_variables.append(child.attrib['name']) else: self.redefine_variables.append(namespace + '.' + family_name + '.' + child.attrib['name']) elif child.tag == 'family': family_name = normalize_family(child.attrib['name']) if namespace != 'creole': family_name = namespace + '.' + family_name self.paths.add_family(namespace, family_name, creoleobj, ) creoleobj.path = self.paths.get_family_path(family_name, namespace) def space_visitor(self, eosfunc_file): # pylint: disable=C0111 ServiceAnnotator(self) SpaceAnnotator(self, eosfunc_file) def save(self, filename, force_no_save=False): """Save an XML output on disk :param filename: the full XML filename """ xml = Element('rougail') self._xml_export(xml, self.space) if not force_no_save: self.xmlreflector.save_xmlfile(filename, xml) return xml def get_attributes(self, space): # pylint: disable=R0201 for attr in dir(space): if not attr.startswith('_'): yield attr def _sub_xml_export(self, name, node, node_name, space, current_space): if isinstance(space, dict): space = list(space.values()) if isinstance(space, list): for subspace in space: if isinstance(subspace, self.Leadership): _name = 'leader' else: _name = name if name in ['services', 'variables', 'actions']: _name = 'family' if HIGH_COMPATIBILITY and not hasattr(subspace, 'doc'): subspace.doc = '' if _name == 'value' and (not hasattr(subspace, 'name') or subspace.name is None): continue child_node = SubElement(node, _name) self._xml_export(child_node, subspace, _name) elif isinstance(space, self.Atom): if name == 'services': child_node = SubElement(node, 'family') child_node.attrib['name'] = name else: child_node = SubElement(node, name) for subname in self.get_attributes(space): subspace = getattr(space, subname) self._sub_xml_export(subname, child_node, name, subspace, space) elif isinstance(space, self.Redefinable): child_node = SubElement(node, 'family') child_node.attrib['name'] = name for subname in self.get_attributes(space): subspace = getattr(space, subname) self._sub_xml_export(subname, child_node, name, subspace, space) else: # FIXME plutot dans annotator ... if name in PROPERTIES and node.tag in ['variable', 'family', 'leader']: if space is True: for prop in CONVERT_PROPERTIES.get(name, [name]): SubElement(node, 'property').text = prop elif name not in ERASED_ATTRIBUTES: if name == 'name' and node_name in self.forced_text_elts_as_name and not hasattr(current_space, 'param'): if isinstance(space, str): node.text = space else: node.text = str(space) elif name == 'text' and node_name in self.forced_text_elts: node.text = space elif node.tag == 'family' and name == 'name': if 'doc' not in node.attrib.keys(): node.attrib['doc'] = space node.attrib['name'] = normalize_family(space, check_name=False) elif node.tag in ['variable', 'family', 'leader'] and name == 'mode': if space is not None: SubElement(node, 'property').text = space else: if name in RENAME_ATTIBUTES: name = RENAME_ATTIBUTES[name] if space is not None: node.attrib[name] = str(space) def _xml_export(self, node, space, node_name='creole'): for name in self.get_attributes(space): subspace = getattr(space, name) self._sub_xml_export(name, node, node_name, subspace, space)