497 lines
18 KiB
Python
497 lines
18 KiB
Python
"""loader
|
|
flattened XML specific
|
|
"""
|
|
from os.path import isfile
|
|
from lxml.etree import DTD
|
|
|
|
from .config import dtdfilename, variable_namespace
|
|
from .i18n import _
|
|
from .error import LoaderError
|
|
|
|
|
|
FUNC_TO_DICT = ['valid_not_equal']
|
|
FORCE_INFORMATIONS = ['help', 'test', 'separator']
|
|
|
|
|
|
def convert_boolean(value):
|
|
prop = {'True': True,
|
|
'False': False,
|
|
'None': None}
|
|
if value not in prop:
|
|
raise Exception('unknown value {} while trying to cast {} to boolean'.format(value, obj))
|
|
return prop[value]
|
|
|
|
|
|
def convert_tiramisu_value(value, type_):
|
|
"""
|
|
convertit les variables dans le bon type si nécessaire
|
|
"""
|
|
if value is None:
|
|
return value
|
|
func = CONVERT_OPTION[type_].get('func', None)
|
|
if func is None:
|
|
return value
|
|
if isinstance(value, list):
|
|
return [func(val) for val in value]
|
|
else:
|
|
return func(value)
|
|
|
|
|
|
CONVERT_OPTION = {'number': dict(opttype="IntOption", func=int),
|
|
'choice': dict(opttype="ChoiceOption"),
|
|
'string': dict(opttype="StrOption"),
|
|
'password': dict(opttype="PasswordOption"),
|
|
'mail': dict(opttype="EmailOption"),
|
|
'boolean': dict(opttype="BoolOption", initkwargs={'default': [True]}, func=convert_boolean),
|
|
'symlink': dict(opttype="SymLinkOption"),
|
|
'filename': dict(opttype="FilenameOption"),
|
|
'date': dict(opttype="DateOption"),
|
|
'unix_user': dict(opttype="UsernameOption"),
|
|
'ip': dict(opttype="IPOption", initkwargs={'allow_reserved': True}),
|
|
'local_ip': dict(opttype="IPOption", initkwargs={'private_only': True, 'warnings_only': True}),
|
|
'netmask': dict(opttype="NetmaskOption"),
|
|
'network': dict(opttype="NetworkOption"),
|
|
'broadcast': dict(opttype="BroadcastOption"),
|
|
'netbios': dict(opttype="DomainnameOption", initkwargs={'type': 'netbios', 'warnings_only': True}),
|
|
'domain': dict(opttype="DomainnameOption", initkwargs={'type': 'domainname', 'allow_ip': True, 'allow_without_dot': True}),
|
|
'domain_strict': dict(opttype="DomainnameOption", initkwargs={'type': 'domainname', 'allow_ip': False}),
|
|
'hostname': dict(opttype="DomainnameOption", initkwargs={'type': 'hostname', 'allow_ip': True}),
|
|
'hostname_strict': dict(opttype="DomainnameOption", initkwargs={'type': 'hostname', 'allow_ip': False}),
|
|
'web_address': dict(opttype="URLOption", initkwargs={'allow_ip': True, 'allow_without_dot': True}),
|
|
'port': dict(opttype="PortOption", initkwargs={'allow_private': True}),
|
|
'mac': dict(opttype="MACOption"),
|
|
'cidr': dict(opttype="IPOption", initkwargs={'cidr': True}),
|
|
'network_cidr': dict(opttype="NetworkOption", initkwargs={'cidr': True}),
|
|
}
|
|
|
|
|
|
class PopulateTiramisuObjects:
|
|
def __init__(self,
|
|
xmlroot,
|
|
funcs_path,
|
|
):
|
|
self.storage = ElementStorage(self.parse_dtd())
|
|
self.storage.text = ["from tiramisu import *",
|
|
"from rougail.tiramisu import ConvertDynOptionDescription",
|
|
"import imp",
|
|
f"func = imp.load_source('func', '{funcs_path}')",
|
|
]
|
|
self.make_tiramisu_objects(xmlroot)
|
|
# parse object
|
|
self.storage.get('.').get()
|
|
|
|
def parse_dtd(self):
|
|
"""Loads the DTD
|
|
|
|
:raises IOError: if the DTD is not found
|
|
|
|
:param dtdfilename: the full filename of the DTD
|
|
"""
|
|
if not isfile(dtdfilename):
|
|
raise IOError(_("no such DTD file: {}").format(dtdfilename))
|
|
booleans = []
|
|
with open(dtdfilename, 'r') as dtdfd:
|
|
dtd = DTD(dtdfd)
|
|
for elt in dtd.iterelements():
|
|
if elt.name == 'variable':
|
|
for attr in elt.iterattributes():
|
|
if set(attr.itervalues()) == set(['True', 'False']):
|
|
booleans.append(attr.name)
|
|
return booleans
|
|
|
|
def make_tiramisu_objects(self,
|
|
xmlroot,
|
|
):
|
|
family = self.get_root_family()
|
|
for xmlelt in self.reorder_family(xmlroot):
|
|
self.iter_family(xmlelt,
|
|
family,
|
|
None,
|
|
)
|
|
|
|
def get_root_family(self):
|
|
family = Family(BaseElt({'name': 'baseoption',
|
|
'doc': 'baseoption'}),
|
|
self.storage,
|
|
False,
|
|
'.',
|
|
)
|
|
return family
|
|
|
|
def reorder_family(self, xmlroot):
|
|
xmlelts = []
|
|
for xmlelt in xmlroot:
|
|
# variable_namespace family has to be loaded before any other family
|
|
# because `extra` family could use `variable_namespace` variables.
|
|
if xmlelt.attrib['name'] == variable_namespace:
|
|
xmlelts.insert(0, xmlelt)
|
|
else:
|
|
xmlelts.append(xmlelt)
|
|
return xmlelts
|
|
|
|
def iter_family(self,
|
|
child,
|
|
family,
|
|
subpath,
|
|
):
|
|
if child.tag in ['family', 'leader']:
|
|
function = self.populate_family
|
|
elif child.tag == 'variable':
|
|
function = self.populate_variable
|
|
elif child.tag == 'property':
|
|
# property already imported with family
|
|
return
|
|
else:
|
|
raise Exception('unknown tag {}'.format(child.tag))
|
|
function(family,
|
|
child,
|
|
subpath,
|
|
)
|
|
|
|
def populate_family(self,
|
|
parent_family,
|
|
elt,
|
|
subpath,
|
|
):
|
|
path = self.build_path(subpath,
|
|
elt,
|
|
)
|
|
family = Family(elt,
|
|
self.storage,
|
|
elt.tag == 'leader',
|
|
path,
|
|
)
|
|
parent_family.add(family)
|
|
if len(elt) != 0:
|
|
for child in elt:
|
|
self.iter_family(child,
|
|
family,
|
|
path,
|
|
)
|
|
|
|
def populate_variable(self,
|
|
family,
|
|
elt,
|
|
subpath,
|
|
):
|
|
is_follower = False
|
|
is_leader = False
|
|
if family.is_leader:
|
|
if elt.attrib['name'] != family.elt.attrib['name']:
|
|
is_follower = True
|
|
else:
|
|
is_leader = True
|
|
family.add(Variable(elt,
|
|
self.storage,
|
|
is_follower,
|
|
is_leader,
|
|
self.build_path(subpath,
|
|
elt,
|
|
)
|
|
))
|
|
|
|
def build_path(self,
|
|
subpath,
|
|
elt,
|
|
):
|
|
if subpath is None:
|
|
return elt.attrib['name']
|
|
return subpath + '.' + elt.attrib['name']
|
|
|
|
def get_text(self):
|
|
return '\n'.join(self.storage.get('.').get_text())
|
|
|
|
|
|
class BaseElt:
|
|
def __init__(self, attrib):
|
|
self.attrib = attrib
|
|
|
|
def __iter__(self):
|
|
return iter([])
|
|
|
|
|
|
class ElementStorage:
|
|
def __init__(self,
|
|
booleans,
|
|
):
|
|
self.paths = {}
|
|
self.text = []
|
|
self.index = 0
|
|
self.booleans = booleans
|
|
|
|
def add(self, path, elt):
|
|
self.paths[path] = (elt, self.index)
|
|
self.index += 1
|
|
|
|
def get(self, path):
|
|
if path not in self.paths or self.paths[path][0] is None:
|
|
raise LoaderError(_('there is no element for path {}').format(path))
|
|
return self.paths[path][0]
|
|
|
|
def get_name(self, path):
|
|
if path not in self.paths:
|
|
raise LoaderError(_('there is no element for index {}').format(path))
|
|
return f'option_{self.paths[path][1]}'
|
|
|
|
|
|
class Common:
|
|
def __init__(self,
|
|
elt,
|
|
storage,
|
|
is_leader,
|
|
path,
|
|
):
|
|
self.option_name = None
|
|
self.path = path
|
|
self.attrib = {}
|
|
self.informations = {}
|
|
self.storage = storage
|
|
self.is_leader = is_leader
|
|
self.storage.add(self.path, self)
|
|
|
|
def populate_properties(self, child):
|
|
if child.get('type') == 'calculation':
|
|
action = f"ParamValue('{child.text}')"
|
|
option_name = self.storage.get(child.attrib['source']).get()
|
|
kwargs = f"'condition': ParamOption({option_name}, todict=True), 'expected': ParamValue('{child.attrib.get('expected')}')"
|
|
if child.attrib['inverse'] == 'True':
|
|
kwargs += ", 'reverse_condition': ParamValue(True)"
|
|
prop = 'Calculation(calc_value, Params(' + action + ', kwargs={' + kwargs + '}))'
|
|
else:
|
|
prop = "'" + child.text + "'"
|
|
if self.attrib['properties']:
|
|
self.attrib['properties'] += ', '
|
|
self.attrib['properties'] += prop
|
|
if not self.attrib['properties']:
|
|
del self.attrib['properties']
|
|
|
|
def get_attrib(self, attrib):
|
|
ret_list = []
|
|
for key, value in self.attrib.items():
|
|
if key == 'properties':
|
|
value = 'frozenset([' + self.attrib[key] + '])'
|
|
elif key in ['default', 'multi', 'suffixes', 'validators']:
|
|
value = self.attrib[key]
|
|
elif isinstance(value, str) and key != 'opt' and not value.startswith('['):
|
|
value = "'" + value.replace("'", "\\\'") + "'"
|
|
ret_list.append(f'{key}={value}')
|
|
return ', '.join(ret_list)
|
|
|
|
def populate_informations(self):
|
|
for key, value in self.informations.items():
|
|
value = value.replace('"', '\"')
|
|
self.storage.text.append(f'{self.option_name}.impl_set_information("{key}", "{value}")')
|
|
|
|
def get_text(self,
|
|
):
|
|
return self.storage.text
|
|
|
|
|
|
class Variable(Common):
|
|
def __init__(self,
|
|
elt,
|
|
storage,
|
|
is_follower,
|
|
is_leader,
|
|
path,
|
|
):
|
|
super().__init__(elt,
|
|
storage,
|
|
is_leader,
|
|
path,
|
|
)
|
|
self.is_follower = is_follower
|
|
convert_option = CONVERT_OPTION[elt.attrib['type']]
|
|
del elt.attrib['type']
|
|
self.object_type = convert_option['opttype']
|
|
self.attrib.update(convert_option.get('initkwargs', {}))
|
|
if self.object_type != 'SymLinkOption':
|
|
self.attrib['properties'] = ''
|
|
self.attrib['validators'] = []
|
|
self.elt = elt
|
|
|
|
def get(self):
|
|
if self.option_name is None:
|
|
self.populate_attrib()
|
|
if self.object_type == 'SymLinkOption':
|
|
self.attrib['opt'] = self.storage.get(self.attrib['opt']).get()
|
|
else:
|
|
self.parse_multi()
|
|
self.parse_children()
|
|
attrib = self.get_attrib(self.attrib)
|
|
self.option_name = self.storage.get_name(self.path)
|
|
self.storage.text.append(f'{self.option_name} = {self.object_type}({attrib})')
|
|
self.populate_informations()
|
|
return self.option_name
|
|
|
|
def populate_attrib(self):
|
|
for key, value in self.elt.attrib.items():
|
|
if key in self.storage.booleans:
|
|
value = convert_boolean(value)
|
|
if key in FORCE_INFORMATIONS:
|
|
self.informations[key] = value
|
|
else:
|
|
self.attrib[key] = value
|
|
|
|
def parse_children(self):
|
|
if not 'default' in self.attrib or self.attrib['multi']:
|
|
self.attrib['default'] = []
|
|
if self.attrib['multi'] == 'submulti' and self.is_follower:
|
|
self.attrib['default_multi'] = []
|
|
choices = []
|
|
for child in self.elt:
|
|
if child.tag == 'property':
|
|
self.populate_properties(child)
|
|
elif child.tag == 'value':
|
|
if child.attrib['type'] == 'calculation':
|
|
self.attrib['default'] = self.calculation_value(child, [])
|
|
else:
|
|
self.populate_value(child)
|
|
elif child.tag == 'check':
|
|
self.attrib['validators'].append(self.calculation_value(child, ['ParamSelfOption()']))
|
|
elif child.tag == 'choice':
|
|
choices.append(convert_tiramisu_value(child.text, child.attrib['type']))
|
|
if choices:
|
|
self.attrib['values'] = tuple(choices)
|
|
if self.attrib['default'] == []:
|
|
del self.attrib['default']
|
|
elif not self.attrib['multi'] and isinstance(self.attrib['default'], list):
|
|
self.attrib['default'] = self.attrib['default'][-1]
|
|
if self.attrib['validators'] == []:
|
|
del self.attrib['validators']
|
|
else:
|
|
self.attrib['validators'] = '[' + ', '.join(self.attrib['validators']) + ']'
|
|
|
|
def parse_multi(self):
|
|
if self.is_follower:
|
|
if self.attrib['multi'] is True:
|
|
self.attrib['multi'] = 'submulti'
|
|
else:
|
|
self.attrib['multi'] = True
|
|
|
|
def calculation_value(self, child, args):
|
|
kwargs = []
|
|
if 'name' in child.attrib:
|
|
# has parameters
|
|
function = child.attrib['name']
|
|
for param in child:
|
|
value = self.populate_param(param)
|
|
if 'name' not in param.attrib:
|
|
args.append(str(value))
|
|
else:
|
|
kwargs.append(f"'{param.attrib['name']}': " + value)
|
|
else:
|
|
# function without any parameter
|
|
function = child.text.strip()
|
|
ret = f"Calculation(func.{function}, Params((" + ', '.join(args) + "), kwargs=" + "{" + ', '.join(kwargs) + "})"
|
|
if 'warnings_only' in child.attrib:
|
|
ret += f', warnings_only={child.attrib["warnings_only"]}'
|
|
return ret + ')'
|
|
|
|
def populate_param(self,
|
|
param,
|
|
):
|
|
if param.attrib['type'] == 'string':
|
|
return f'ParamValue("{param.text}")'
|
|
elif param.attrib['type'] == 'number':
|
|
return f'ParamValue({param.text})'
|
|
elif param.attrib['type'] == 'variable':
|
|
value = {'option': param.text,
|
|
'notraisepropertyerror': convert_boolean(param.attrib['notraisepropertyerror']),
|
|
'todict': param.text in FUNC_TO_DICT,
|
|
}
|
|
if 'suffix' in param.attrib:
|
|
value['suffix'] = param.attrib['suffix']
|
|
return self.build_param(value)
|
|
raise LoaderError(_('unknown param type {}').format(param.attrib['type']))
|
|
|
|
def populate_value(self,
|
|
child,
|
|
):
|
|
value = convert_tiramisu_value(child.text, child.attrib['type'])
|
|
if self.attrib['multi'] == 'submulti':
|
|
self.attrib['default_multi'].append(value)
|
|
elif self.is_follower:
|
|
self.attrib['default_multi'] = value
|
|
elif self.attrib['multi']:
|
|
self.attrib['default'].append(value)
|
|
if not self.is_leader:
|
|
self.attrib['default_multi'] = value
|
|
elif isinstance(value, int):
|
|
self.attrib['default'].append(value)
|
|
else:
|
|
self.attrib['default'].append("'" + value + "'")
|
|
|
|
def build_param(self,
|
|
param,
|
|
):
|
|
option_name = self.storage.get(param['option']).get()
|
|
if 'suffix' in param:
|
|
family = '.'.join(param['option'].split('.')[:-1])
|
|
family_option = self.storage.get_name(family)
|
|
return f"ParamDynOption({option_name}, '{param['suffix']}', {family_option}, notraisepropertyerror={param['notraisepropertyerror']}, todict={param['todict']})"
|
|
return f"ParamOption({option_name}, notraisepropertyerror={param['notraisepropertyerror']}, todict={param['todict']})"
|
|
|
|
|
|
class Family(Common):
|
|
def __init__(self,
|
|
elt,
|
|
storage,
|
|
is_leader,
|
|
path,
|
|
):
|
|
super().__init__(elt,
|
|
storage,
|
|
is_leader,
|
|
path,
|
|
)
|
|
self.children = []
|
|
self.elt = elt
|
|
|
|
def add(self, child):
|
|
self.children.append(child)
|
|
|
|
def get(self):
|
|
if not self.option_name:
|
|
self.populate_attrib()
|
|
self.parse_children()
|
|
self.option_name = self.storage.get_name(self.path)
|
|
object_name = self.get_object_name()
|
|
attrib = self.get_attrib(self.attrib) + ', children=[' + ', '.join([child.get() for child in self.children]) + ']'
|
|
self.storage.text.append(f'{self.option_name} = {object_name}({attrib})')
|
|
self.populate_informations()
|
|
return self.option_name
|
|
|
|
def populate_attrib(self):
|
|
for key, value in self.elt.attrib.items():
|
|
if key == 'help':
|
|
self.informations[key] = value
|
|
elif key == 'dynamic':
|
|
dynamic = self.storage.get(value).get()
|
|
self.attrib['suffixes'] = f"Calculation(func.calc_value, Params((ParamOption({dynamic}))))"
|
|
else:
|
|
self.attrib[key] = value
|
|
|
|
def parse_children(self):
|
|
self.attrib['properties'] = ''
|
|
for child in self.elt:
|
|
if child.tag == 'property':
|
|
self.populate_properties(child)
|
|
if not self.attrib['properties']:
|
|
del self.attrib['properties']
|
|
|
|
def get_object_name(self):
|
|
if 'suffixes' in self.attrib:
|
|
return 'ConvertDynOptionDescription'
|
|
elif not self.is_leader:
|
|
return 'OptionDescription'
|
|
return 'Leadership'
|
|
|
|
|
|
def load(xmlroot: str,
|
|
funcs_path: str):
|
|
tiramisu_objects = PopulateTiramisuObjects(xmlroot,
|
|
funcs_path,
|
|
)
|
|
return tiramisu_objects.get_text()
|