diff --git a/src/rougail/__init__.py b/src/rougail/__init__.py index a36f3a94..c6cb4ae5 100644 --- a/src/rougail/__init__.py +++ b/src/rougail/__init__.py @@ -27,5 +27,6 @@ Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA from .convert import RougailConvert from .template.systemd import RougailSystemdTemplate from .config import RougailConfig +from rougail.update import RougailUpgrade -__ALL__ = ('RougailConvert', 'RougailSystemdTemplate', 'RougailConfig') +__ALL__ = ('RougailConvert', 'RougailSystemdTemplate', 'RougailConfig', 'RougailUpgrade') diff --git a/src/rougail/error.py b/src/rougail/error.py index 68cd61a7..7795378c 100644 --- a/src/rougail/error.py +++ b/src/rougail/error.py @@ -70,3 +70,8 @@ class DictConsistencyError(Exception): msg = _(f'{msg} in {display_xmlfiles(xmlfiles)}') super().__init__(msg) self.errno = errno + + +class UpgradeError(Exception): + """Error during XML upgrade + """ diff --git a/src/rougail/update.py b/src/rougail/update.py new file mode 100644 index 00000000..e0e2028d --- /dev/null +++ b/src/rougail/update.py @@ -0,0 +1,486 @@ +"""Update Rougail XML file to new version + +Cadoles (http://www.cadoles.com) +Copyright (C) 2021 + +distribued with GPL-2 or later license + +This program is free software; you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation; either version 2 of the License, or +(at your option) any later version. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program; if not, write to the Free Software +Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA +""" + +from typing import List +from os.path import join, isfile, basename +from os import listdir +from lxml.etree import DTD, parse, XMLParser, XMLSyntaxError # pylint: disable=E0611 +from lxml.etree import Element, SubElement, tostring + +from .i18n import _ +from .error import UpgradeError + + +VERSIONS = {'creole': ['1'], + 'rougail': ['0.9'], + } + + +def get_function_name(root, version): + version = version.replace('.', '_') + return f'update_{root}_{version}' + + +FUNCTION_VERSIONS = [get_function_name(root, version) for root, versions in VERSIONS.items() for version in versions] + + +class RougailUpgrade: + def __init__(self, test=False, upgrade_help=None): + self.test = test + if upgrade_help is None: + upgrade_help = {} + self.upgrade_help = upgrade_help + + def load_xml_from_folders(self, + srcfolder: str, + dstfolder: str, + ): + """Loads all the XML files located in the xmlfolders' list + + :param xmlfolders: list of full folder's name + """ + filenames = [filename for filename in listdir(srcfolder) if filename.endswith('.xml')] + filenames.sort() + for filename in filenames: + xmlsrc = join(srcfolder, filename) + xmldst = join(dstfolder, filename) + try: + parser = XMLParser(remove_blank_text=True) + document = parse(xmlsrc, parser) + except XMLSyntaxError as err: + raise Exception(_(f'not a XML file: {err}')) from err + root = document.getroot() + search_function_name = get_function_name(root.tag, root.attrib.get('version', '1')) + function_found = False + for function_version in FUNCTION_VERSIONS: + if function_found and hasattr(self, function_version): + upgrade_help = self.upgrade_help.get(function_version, {}).get(filename, {}) + root = getattr(self, function_version)(root, upgrade_help) + if function_version == search_function_name: + function_found = True + with open(xmldst, 'wb') as xmlfh: + xmlfh.write(tostring(root, pretty_print=True, encoding="UTF-8", xml_declaration=True)) + # if + # if not self.dtd.validate(document): + # dtd_error = self.dtd.error_log.filter_from_errors()[0] + # msg = _(f'not a valid XML file: {dtd_error}') + # raise DictConsistencyError(msg, 43, [xmlfile]) + # yield xmlfile, document.getroot() + + + def update_rougail_0_9(self, + root: 'Element', + upgrade_help: dict, + ) -> 'Element': + # rename root + root.tag = 'rougail' + root.attrib['version'] = '0.9' + variables_auto_valid_enum = {} + variables_help = {} + families_help = {} + variables = {} + families = {} + valid_enums = {} + current_service = upgrade_help.get('services', {}).get('default', 'unknown') + files = {current_service: {}} + ips = {current_service: []} + servicelists = {} + test_unknowns_variables = set() + autos = [] + constraints_obj = None + for subelement in root: + if not isinstance(subelement.tag, str): + # XML comment + continue + for subsubelement in subelement: + if not isinstance(subsubelement.tag, str): + # XML comment + continue + for subsubsubelement in subsubelement: + if not isinstance(subsubsubelement.tag, str): + # XML comment + continue + if subsubsubelement.tag == 'variable': + if subsubsubelement.attrib['name'] in upgrade_help.get('variables', {}).get('remove', []): + subsubelement.remove(subsubsubelement) + continue + if subsubsubelement.attrib['name'] in upgrade_help.get('variables', {}).get('type', {}): + subsubsubelement.attrib['type'] = upgrade_help['variables']['type'][subsubsubelement.attrib['name']] + if subsubsubelement.attrib['name'] in upgrade_help.get('variables', {}).get('hidden', {}).get('add', []): + subsubsubelement.attrib['hidden'] = 'True' + if subsubsubelement.attrib['name'] in upgrade_help.get('variables', {}).get('hidden', {}).get('remove', []): + self.remove(subsubsubelement, 'hidden', optional=True) + if subsubsubelement.attrib['name'] in upgrade_help.get('variables', {}).get('mandatory', {}).get('remove', []): + self.remove(subsubsubelement, 'mandatory', optional=True) + variables[subsubsubelement.attrib['name']] = subsubsubelement + type = subsubsubelement.attrib.get('type') + if type in ['oui/non', 'yes/no', 'on/off']: + variables_auto_valid_enum.setdefault(subsubsubelement.attrib['type'], []).append(subsubsubelement.attrib['name']) + del subsubsubelement.attrib['type'] + elif type == 'hostname_strict': + subsubsubelement.attrib['type'] = 'hostname' + elif type == 'domain_strict': + subsubsubelement.attrib['type'] = 'domain' + elif type == 'string': + del subsubsubelement.attrib['type'] + if self.test and subsubsubelement.attrib.get('auto_freeze') == 'True': + del subsubsubelement.attrib['auto_freeze'] + #if self.test and subsubsubelement.attrib.get('redefine') == 'True': + # subsubsubelement.attrib['exists'] = 'False' + # del subsubsubelement.attrib['redefine'] + if subsubsubelement.attrib['name'] in upgrade_help.get('variables', {}).get('redefine', {}).get('remove', {}): + del subsubsubelement.attrib['redefine'] + if subsubsubelement.attrib['name'] in upgrade_help.get('variables', {}).get('remove_check', []): + subsubsubelement.attrib['remove_check'] = 'True' + if subsubsubelement.attrib['name'] in upgrade_help.get('variables', {}).get('mode', {}).get('modify', {}): + subsubsubelement.attrib['mode'] = upgrade_help.get('variables', {}).get('mode', {}).get('modify', {})[subsubsubelement.attrib['name']] + if subsubsubelement.attrib['name'] in upgrade_help.get('variables', {}).get('test', {}): + subsubsubelement.attrib['test'] = upgrade_help.get('variables', {}).get('test', {})[subsubsubelement.attrib['name']] +# for value in subsubsubelement: +# if value.text is None: +# value.attrib['type'] = 'nil' + elif subsubsubelement.tag == 'param': + type = subsubsubelement.attrib.get('type') + if type == 'eole': + subsubsubelement.attrib['type'] = 'variable' + type = 'variable' + elif type in ('container', 'context', 'python'): + raise UpgradeError(_(f'cannot convert param with type "{type}"')) + if subsubelement.attrib['name'] == 'valid_entier' and not 'type' in subsubsubelement.attrib: + subsubsubelement.attrib['type'] = 'number' + if subsubelement.attrib['name'] == 'valid_enum' and not type: + if subsubsubelement.attrib.get('name') == 'checkval': + raise UpgradeError(_('checkval in valid_enum is no more supported')) + for val in eval(subsubsubelement.text): + SubElement(subsubelement, 'param').text = str(val) + subsubelement.remove(subsubsubelement) + self.move(subsubsubelement, 'hidden', 'propertyerror', optional=True) + if type == 'variable' and subsubsubelement.text in upgrade_help.get('variables', {}).get('rename', []): + subsubsubelement.text = upgrade_help.get('variables', {}).get('rename', [])[subsubsubelement.text] + up = upgrade_help.get('params', {}).get(subsubsubelement.text) + if up: + if 'text' in up: + subsubsubelement.text = up['text'] + if 'type' in up: + subsubsubelement.attrib['type'] = up['type'] + elif subsubsubelement.tag == 'target': + type = subsubsubelement.attrib.get('type') + if type in ['service_accesslist', 'service_restrictionlist', 'interfacelist', 'hostlist']: + subsubelement.remove(subsubsubelement) +# elif self.test and type in ['filelist', 'variable', 'servicelist']: +# subsubsubelement.attrib['optional'] = "True" + elif (not type or type == 'variable') and subsubsubelement.text in upgrade_help.get('variables', {}).get('remove', []): + subsubelement.remove(subsubsubelement) + elif type == 'family' and subsubsubelement.text in upgrade_help.get('families', {}).get('remove', []): + subsubelement.remove(subsubsubelement) + up = upgrade_help.get('targets', {}).get(subsubsubelement.text) + if up: + if 'optional' in up: + subsubsubelement.attrib['optional'] = up['optional'] + has_target = False + for target in subsubelement: + if target.tag == 'target': + has_target = True + break + if not has_target: + subelement.remove(subsubelement) + continue + elif subsubsubelement.tag == 'slave': + subsubsubelement.tag = 'follower' + if subelement.tag == 'containers': + current_service = self.upgrade_container(subsubsubelement, current_service, files, ips, servicelists, upgrade_help) + subsubelement.remove(subsubsubelement) + if subelement.tag == 'help': + if subsubelement.tag == 'variable': + if not subsubelement.attrib['name'] in upgrade_help.get('variables', {}).get('remove', []): + variables_help[subsubelement.attrib['name']] = subsubelement.text + elif subsubelement.tag == 'family': + if subsubelement.attrib['name'] not in upgrade_help.get('families', {}).get('remove', []): + families_help[subsubelement.attrib['name']] = subsubelement.text + else: + raise Exception(f'unknown help tag {subsubelement.tag}') + subelement.remove(subsubelement) + continue + if subsubelement.tag == 'auto': + if subsubelement.attrib['target'] in upgrade_help.get('variables', {}).get('remove', []): + subelement.remove(subsubelement) + continue + autos.append(subsubelement.attrib['target']) + subsubelement.tag = 'fill' + if subsubelement.tag in ['fill', 'check']: + if subsubelement.tag == 'check' and subsubelement.attrib['name'] == 'valid_enum': + if subsubelement.attrib['target'] in upgrade_help.get('variables', {}).get('remove', []): + subelement.remove(subsubelement) + continue + params = [] + for param in subsubelement: + if param.tag == 'param': + params.append(param.text) + key = '~'.join(params) + if key in valid_enums: + SubElement(valid_enums[key], 'target').text = subsubelement.attrib['target'] + subelement.remove(subsubelement) + continue + valid_enums['~'.join(params)] = subsubelement + if subsubelement.tag == 'fill' and subsubelement.attrib['target'] in upgrade_help.get('fills', {}).get('remove', []): + subelement.remove(subsubelement) + continue + if subsubelement.attrib['target'] in upgrade_help.get('variables', {}).get('remove', []): + subelement.remove(subsubelement) + continue + if subsubelement.attrib['name'] == 'valid_networknetmask': + subsubelement.attrib['name'] = 'valid_network_netmask' + target = SubElement(subsubelement, 'target') + target.text = subsubelement.attrib['target'] + if self.test: + target.attrib['optional'] = 'True' + del subsubelement.attrib['target'] + elif subsubelement.tag == 'separators': + subelement.remove(subsubelement) + elif subsubelement.tag == 'condition': + if subsubelement.attrib['source'] in upgrade_help.get('variables', {}).get('remove', []): + try: + subelement.remove(subsubelement) + except: + pass + else: + if subsubelement.attrib['name'] == 'hidden_if_not_in': + subsubelement.attrib['name'] = 'disabled_if_not_in' + if subsubelement.attrib['name'] == 'hidden_if_in': + subsubelement.attrib['name'] = 'disabled_if_in' + if subsubelement.attrib['name'] == 'frozen_if_in': + subsubelement.attrib['name'] = 'hidden_if_in' + self.move(subsubelement, 'fallback', 'optional', optional='True') + if subsubelement.attrib['source'] in upgrade_help.get('variables', {}).get('rename', []): + subsubelement.attrib['source'] = upgrade_help.get('variables', {}).get('rename', [])[subsubelement.attrib['source']] + elif subsubelement.tag == 'group': + if subsubelement.attrib['master'] in upgrade_help.get('groups', {}).get('remove', []): + subelement.remove(subsubelement) + else: + self.move(subsubelement, 'master', 'leader') + if subsubelement.attrib['leader'] in upgrade_help.get('groups', {}).get('reorder', {}): + for follower in subsubelement: + subsubelement.remove(follower) + for follower in upgrade_help.get('groups', {}).get('reorder', {})[subsubelement.attrib['leader']]: + SubElement(subsubelement, 'follower').text = follower + if subelement.tag == 'files': + current_service = self.upgrade_container(subsubelement, current_service, files, ips, servicelists, upgrade_help) + subelement.remove(subsubelement) + if subsubelement.tag == 'family': + self.remove(subsubelement, 'icon', optional=True) + if subsubelement.attrib['name'] in upgrade_help.get('families', {}).get('mode', {}).get('remove', []) and 'mode' in subsubelement.attrib: + del subsubelement.attrib['mode'] + if subsubelement.attrib['name'] in upgrade_help.get('families', {}).get('rename', {}): + subsubelement.attrib['name'] = upgrade_help.get('families', {}).get('rename', {})[subsubelement.attrib['name']] + if subsubelement.attrib['name'] in upgrade_help.get('families', {}).get('remove', []): + subelement.remove(subsubelement) + continue + if subsubelement.attrib['name'] in upgrade_help.get('families', {}).get('hidden', {}).get('add', []): + subsubelement.attrib['hidden'] = 'True' + families[subsubelement.attrib['name']] = subsubelement + # if empty, remove + if is_empty(subelement) or subelement.tag in ['containers', 'files', 'help']: + root.remove(subelement) + continue + # copy subelement + if subelement.tag == 'constraints': + constraints_obj = subelement + services = None + service_elt = {} + for variable, obj in upgrade_help.get('variables', {}).get('add', {}).items(): + family = next(iter(families.values())) + variables[variable] = SubElement(family, 'variable', name=variable) + if 'value' in obj: + SubElement(variables[variable], 'value').text = obj['value'] + for name in ['hidden', 'exists']: + if name in obj: + variables[variable].attrib[name] = obj[name] + files_is_empty = True + for lst in files.values(): + if len(lst): + files_is_empty = False + break + if not files_is_empty: + services = Element('services') + root.insert(0, services) + for service_name, lst in files.items(): + service = self.create_service(services, service_name, service_elt, servicelists, upgrade_help) + for file_ in lst.values(): + self.move(file_, 'name_type', 'file_type', optional=True) + if 'file_type' in file_.attrib and file_.attrib['file_type'] == 'SymLinkOption': + file_.attrib['file_type'] = 'variable' + if variables[file_.text].attrib['type'] == 'string': + variables[file_.text].attrib['type'] = 'filename' + self.remove(file_, 'rm', optional=True) + self.remove(file_, 'mkdir', optional=True) + service.append(file_) + ips_is_empty = True + for lst in ips.values(): + if len(lst): + ips_is_empty = False + break + if not ips_is_empty: + if services is None: + services = Element('services') + root.insert(0, services) + for service_name, lst in ips.items(): + service = self.create_service(services, service_name, service_elt, servicelists, upgrade_help) + for ip in lst: + if ip.text in upgrade_help.get('ips', {}).get('remove', []): + continue + for type in ['ip_type', 'netmask_type']: + if type in ip.attrib and ip.attrib[type] == 'SymLinkOption': + ip.attrib[type] = 'variable' + if 'netmask' in ip.attrib: + if ip.attrib['netmask'] == '255.255.255.255': + del ip.attrib['netmask'] + if ip.text in variables and 'type' not in variables[ip.text].attrib: + variables[ip.text].attrib['type'] = 'ip' + else: + if ip.text in variables and 'type' not in variables[ip.text].attrib: + variables[ip.text].attrib['type'] = 'network' + if ip.attrib['netmask'] in variables and 'type' not in variables[ip.attrib['netmask']].attrib: + variables[ip.attrib['netmask']].attrib['type'] = 'netmask' + elif ip.text in variables and 'type' not in variables[ip.text].attrib: + variables[ip.text].attrib['type'] = 'ip' + self.remove(ip, 'interface') + self.remove(ip, 'interface_type', optional=True) + self.remove(ip, 'service_restrictionlist', optional=True) + service.append(ip) + if ips_is_empty and files_is_empty: + for service_name in files: + if services is None: + services = Element('services') + root.insert(0, services) + if service_name != 'unknown': + self.create_service(services, service_name, service_elt, servicelists, upgrade_help) + if constraints_obj is None: + constraints_obj = SubElement(root, 'constraints') + if variables_auto_valid_enum: + for type, variables_valid_enum in variables_auto_valid_enum.items(): + valid_enum = SubElement(constraints_obj, 'check') + valid_enum.attrib['name'] = 'valid_enum' + for val in type.split('/'): + param = SubElement(valid_enum, 'param') + param.text = val + for variable in variables_valid_enum: + target = SubElement(valid_enum, 'target') + target.text = variable + for name, text in variables_help.items(): + variables[name].attrib['help'] = text + for name, text in families_help.items(): + families[name].attrib['help'] = text + for auto in autos: + if auto in variables: + variables[auto].attrib['hidden'] = 'True' + else: + # redefine value in first family + family = next(iter(families.values())) + variable = SubElement(family, 'variable', name=auto, redefine="True", hidden="True") + if self.test: + del variable.attrib['redefine'] + return root + + @staticmethod + def move(elt, src, dst, optional=False): + if src == 'text': + value = elt.text + elt.text = None + else: + if optional and src not in elt.attrib: + return + value = elt.attrib[src] + del elt.attrib[src] + # + if dst == 'text': + elt.text = value + else: + elt.attrib[dst] = value + + @staticmethod + def remove(elt, src, optional=False): + if optional and src not in elt.attrib: + return + del elt.attrib[src] + + @staticmethod + def create_service(services, service_name, service_elt, servicelists, upgrade_help): + if service_name in service_elt: + return service_elt[service_name] + service = SubElement(services, 'service') + service.attrib['name'] = service_name + if service_name == 'unknown': + service.attrib['manage'] = 'False' + service_elt[service_name] = service + if upgrade_help.get('servicelists', {}).get(service_name): + service.attrib['servicelist'] = upgrade_help.get('servicelists', {}).get(service_name) + elif service_name in servicelists: + service.attrib['servicelist'] = servicelists[service_name] + return service + + def upgrade_container(self, elt, current_service, files, ips, servicelists, upgrade_help): + if elt.tag == 'file': + self.move(elt, 'name', 'text') + self.remove(elt, 'del_comment', optional=True) + elt.attrib['engine'] = 'creole_legacy' + if (not 'instance_mode' in elt.attrib or elt.attrib['instance_mode'] != 'when_container') and \ + elt.text not in upgrade_help.get('files', {}).get('remove', {}): + files[current_service][elt.text] = elt + elif elt.tag in ['host', 'disknod', 'fstab', 'interface', 'package', 'service_access']: + pass + elif elt.tag == 'service_restriction': + for restriction in elt: + if restriction.tag == 'ip' and restriction.text != '0.0.0.0': + self.remove(restriction, 'ip_type', optional=True) + self.remove(restriction, 'netmask_type', optional=True) + ips[current_service].append(restriction) + elif elt.tag == 'service': + new_name = elt.text + if current_service == 'unknown': + if new_name in files: + raise Exception('hu?') + files[new_name] = files[current_service] + del files[current_service] + ips[new_name] = ips[current_service] + del ips[current_service] + elif new_name not in files: + files[new_name] = {} + ips[new_name] = [] + current_service = new_name + if 'servicelist' in elt.attrib: + servicelists[current_service] = elt.attrib['servicelist'] + else: + raise Exception(f'unknown containers tag {elt.tag}') + return current_service + + +def is_empty(elt, not_check_attrib=False): + if not not_check_attrib and elt.attrib: + return False + empty = True + for file_ in elt: + empty = False + return empty + + +def update_creole_1(): + print('pfff')