rougail/src/rougail/loader.py

547 lines
21 KiB
Python

"""loader
flattened XML specific
"""
from os.path import join, isfile
from lxml.etree import DTD
import imp
from tiramisu import (StrOption, OptionDescription, DynOptionDescription, PortOption,
IntOption, ChoiceOption, BoolOption, SymLinkOption, IPOption,
NetworkOption, NetmaskOption, DomainnameOption, BroadcastOption,
URLOption, EmailOption, FilenameOption, UsernameOption, DateOption,
PasswordOption, BoolOption, MACOption, Leadership, submulti,
Params, ParamSelfOption, ParamOption, ParamDynOption, ParamValue, Calculation, calc_value)
from .config import dtdfilename
from .i18n import _
from .utils import normalize_family
from .annotator import VARIABLE_NAMESPACE
from .error import CreoleLoaderError
FUNC_TO_DICT = ['valid_not_equal']
class ConvertDynOptionDescription(DynOptionDescription):
def convert_suffix_to_path(self, suffix):
if not isinstance(suffix, str):
suffix = str(suffix)
return normalize_family(suffix,
check_name=False)
def convert_boolean(value):
prop = {'True': True,
'False': False,
'None': None}
if value not in prop:
raise Exception('unknown value {} while trying to cast {} to boolean'.format(value, obj))
return prop[value]
def convert_tiramisu_value(value, type_):
"""
convertit les variables dans le bon type si nécessaire
"""
if value is None:
return value
func = CONVERT_OPTION[type_].get('func', None)
if func is None:
return value
if isinstance(value, list):
return [func(val) for val in value]
else:
return func(value)
CONVERT_OPTION = {'number': dict(opttype=IntOption, func=int),
'choice': dict(opttype=ChoiceOption),
'string': dict(opttype=StrOption),
'password': dict(opttype=PasswordOption),
'mail': dict(opttype=EmailOption),
'boolean': dict(opttype=BoolOption, initkwargs={'default': True}, func=convert_boolean),
'symlink': dict(opttype=SymLinkOption),
'filename': dict(opttype=FilenameOption),
'date': dict(opttype=DateOption),
'unix_user': dict(opttype=UsernameOption),
'ip': dict(opttype=IPOption, initkwargs={'allow_reserved': True}),
'local_ip': dict(opttype=IPOption, initkwargs={'private_only': True, 'warnings_only': True}),
'netmask': dict(opttype=NetmaskOption),
'network': dict(opttype=NetworkOption),
'broadcast': dict(opttype=BroadcastOption),
'netbios': dict(opttype=DomainnameOption, initkwargs={'type': 'netbios', 'warnings_only': True}),
'domain': dict(opttype=DomainnameOption, initkwargs={'type': 'domainname', 'allow_ip': True, 'allow_without_dot': True}),
'domain_strict': dict(opttype=DomainnameOption, initkwargs={'type': 'domainname', 'allow_ip': False}),
'hostname': dict(opttype=DomainnameOption, initkwargs={'type': 'hostname', 'allow_ip': True}),
'hostname_strict': dict(opttype=DomainnameOption, initkwargs={'type': 'hostname', 'allow_ip': False}),
'web_address': dict(opttype=URLOption, initkwargs={'allow_ip': True, 'allow_without_dot': True}),
'port': dict(opttype=PortOption, initkwargs={'allow_private': True}),
'mac': dict(opttype=MACOption),
'cidr': dict(opttype=IPOption, initkwargs={'cidr': True}),
'network_cidr': dict(opttype=NetworkOption, initkwargs={'cidr': True}),
}
class BaseElt:
def __init__(self, attrib):
self.attrib = attrib
class Common:
def populate_properties(self, child):
if child.get('type') == 'calculation':
kwargs = {'condition': child.attrib['source'],
'expected': ParamValue(child.attrib.get('expected'))}
if child.attrib['inverse'] == 'True':
kwargs['reverse_condition'] = ParamValue(True)
self.attrib['properties'].append((ParamValue(child.text), kwargs))
else:
self.attrib['properties'].append(child.text)
def build_properties(self):
for index, prop in enumerate(self.attrib['properties']):
if isinstance(prop, tuple):
action, kwargs = prop
kwargs['condition'] = ParamOption(self.storage.get(kwargs['condition']).get(), todict=True)
prop = Calculation(calc_value,
Params(action,
kwargs=kwargs))
self.attrib['properties'][index] = prop
if self.attrib['properties']:
self.attrib['properties'] = tuple(self.attrib['properties'])
else:
del self.attrib['properties']
class PopulateTiramisuObjects:
def __init__(self):
self.storage = ElementStorage()
self.booleans = []
def parse_dtd(self, dtdfilename):
"""Loads the Creole DTD
:raises IOError: if the DTD is not found
:param dtdfilename: the full filename of the Creole DTD
"""
if not isfile(dtdfilename):
raise IOError(_("no such DTD file: {}").format(dtdfilename))
with open(dtdfilename, 'r') as dtdfd:
dtd = DTD(dtdfd)
for elt in dtd.iterelements():
if elt.name == 'variable':
for attr in elt.iterattributes():
if set(attr.itervalues()) == set(['True', 'False']):
self.booleans.append(attr.name)
def make_tiramisu_objects(self, xmlroot, eosfunc):
self.eosfunc = imp.load_source('eosfunc', eosfunc)
family = self.get_root_family()
for xmlelt in self.reorder_family(xmlroot):
self.iter_family(xmlelt,
family,
None,
)
def get_root_family(self):
family = Family(BaseElt({'name': 'baseoption',
'doc': 'baseoption'}),
self.booleans,
self.storage,
self.eosfunc,
)
self.storage.add('.', family)
return family
def reorder_family(self, xmlroot):
xmlelts = []
for xmlelt in xmlroot:
# VARIABLE_NAMESPACE family has to be loaded before any other family
# because `extra` family could use `VARIABLE_NAMESPACE` variables.
if xmlelt.attrib['name'] == VARIABLE_NAMESPACE:
xmlelts.insert(0, xmlelt)
else:
xmlelts.append(xmlelt)
return xmlelts
def iter_family(self,
child,
family,
subpath,
):
if child.tag in ['family', 'leader']:
function = self.populate_family
elif child.tag == 'variable':
function = self.populate_variable
elif child.tag == 'property':
# property already imported with family
return
else:
raise Exception('unknown tag {}'.format(child.tag))
function(family,
child,
subpath,
)
def populate_family(self,
parent_family,
elt,
subpath,
):
path = self.build_path(subpath,
elt,
)
family = Family(elt,
self.booleans,
self.storage,
self.eosfunc,
elt.tag == 'leader',
)
self.storage.add(path, family)
parent_family.add(family)
if len(elt) != 0:
for child in elt:
self.iter_family(child,
family,
path,
)
def populate_variable(self,
family,
elt,
subpath,
):
is_follower = False
is_leader = False
if family.is_leader:
if elt.attrib['name'] != family.attrib['name']:
is_follower = True
else:
is_leader = True
path = self.build_path(subpath,
elt,
)
variable = Variable(elt,
self.booleans,
self.storage,
is_follower,
is_leader,
self.eosfunc,
)
self.storage.add(path, variable)
family.add(variable)
def build_path(self,
subpath,
elt,
):
if subpath is None:
return elt.attrib['name']
return subpath + '.' + elt.attrib['name']
class ElementStorage:
def __init__(self):
self.paths = {}
def add(self, path, elt):
if path in self.paths:
raise CreoleLoaderError(_('path already loaded {}').format(path))
self.paths[path] = elt
def get(self, path):
if path not in self.paths:
raise CreoleLoaderError(_('there is no element for path {}').format(path))
return self.paths[path]
class Variable(Common):
def __init__(self, elt, booleans, storage, is_follower, is_leader, eosfunc):
self.option = None
self.informations = {}
self.attrib = {}
convert_option = CONVERT_OPTION[elt.attrib['type']]
if 'initkwargs' in convert_option:
self.attrib.update(convert_option['initkwargs'])
if elt.attrib['type'] != 'symlink':
self.attrib['properties'] = []
self.attrib['validators'] = []
self.eosfunc = eosfunc
self.storage = storage
self.booleans = booleans
self.populate_attrib(elt)
self.is_follower = is_follower
self.is_leader = is_leader
self.object_type = convert_option['opttype']
if 'multi' in self.attrib and self.attrib['multi']:
self.attrib['default'] = []
if self.is_follower:
self.attrib['default_multi'] = []
self.parse_children(elt)
if self.is_follower:
if self.attrib['multi'] is True:
self.attrib['multi'] = submulti
else:
self.attrib['multi'] = True
if elt.attrib['type'] == 'symlink':
self.attrib['opt'] = storage.get(self.attrib['opt'])
def parse_children(self,
elt,
):
if elt.attrib['type'] == 'choice':
values = []
for child in elt:
if child.tag == 'property':
self.populate_properties(child)
elif child.tag == 'value':
if child.attrib.get('type') == 'calculation':
self.calc_value(child)
else:
self.populate_value(child, elt)
elif child.tag == 'check':
self.populate_check(child)
elif child.tag == 'choice':
value = child.text
if child.attrib['type'] == 'number':
value = int(value)
values.append(value)
if elt.attrib['type'] == 'choice':
self.attrib['values'] = tuple(values)
def populate_attrib(self,
elt,
):
for key, value in elt.attrib.items():
if key == 'multi' and elt.attrib['type'] == 'symlink':
continue
if key in self.booleans:
if value == 'True':
value = True
elif value == 'False':
value = False
else:
raise CreoleLoaderError(_('unknown value {} for {}').format(value, key))
if key in ['help', 'test', 'separator']:
self.add_information(key, value)
elif key != 'type':
self.attrib[key] = value
def calc_value(self, child):
args = []
kwargs = {}
if child.text is not None and child.text.strip():
function = child.text.strip()
else:
function = child.attrib['name']
for param in child:
self.parse_param(args,
kwargs,
param,
)
self.attrib['default'] = {'function': getattr(self.eosfunc, function),
'args': args,
'kwargs': kwargs,
'warnings_only': False}
def populate_value(self, child, elt):
if "type" in child.attrib:
type_ = child.attrib['type']
else:
type_ = elt.attrib['type']
value = convert_tiramisu_value(child.text, type_)
if self.is_follower:
if self.attrib['multi'] is True:
self.attrib['default_multi'].append(value)
else:
self.attrib['default_multi'] = value
elif self.attrib['multi'] is True:
self.attrib['default'].append(value)
if not self.is_leader:
self.attrib['default_multi'] = value
else:
self.attrib['default'] = value
def populate_check(self, child):
args = [ParamSelfOption()]
kwargs = {}
for param in child:
self.parse_param(args,
kwargs,
param,
)
self.attrib['validators'].append({'function': getattr(self.eosfunc, child.attrib['name']),
'args': args,
'kwargs': kwargs,
'warnings_only': convert_boolean(child.attrib['warnings_only'])})
def parse_param(self,
args,
kwargs,
param,
):
if param.attrib['type'] == 'string':
value = ParamValue(param.text)
elif param.attrib['type'] == 'number':
value = ParamValue(int(param.text))
elif param.attrib['type'] == 'variable':
transitive = convert_boolean(param.attrib.get('transitive', 'False'))
value = {'option': param.text,
'notraisepropertyerror': transitive,
'todict': param.text in FUNC_TO_DICT,
}
if 'suffix' in param.attrib:
value['suffix'] = param.attrib['suffix']
else:
raise CreoleLoaderError(_('unknown param type {}').format(param.attrib['type']))
if 'name' not in param.attrib:
args.append(value)
else:
kwargs[param.attrib['name']] = value
def build_calculator(self, key):
def build_param(param):
option = self.storage.get(param['option']).get()
if 'suffix' in param:
family = '.'.join(param['option'].split('.')[:-1])
return ParamDynOption(option,
param['suffix'],
self.storage.get(family).get(),
notraisepropertyerror=param['notraisepropertyerror'],
todict=param['todict'],
)
return ParamOption(option,
notraisepropertyerror=param['notraisepropertyerror'],
todict=param['todict'],
)
return param
def build_calculation(value):
args = []
kwargs = {}
for param in value['args']:
if isinstance(param, dict):
param = build_param(param)
args.append(param)
for key, param in value['kwargs'].items():
if isinstance(param, dict):
param = build_param(param)
kwargs[key] = param
return Calculation(value['function'],
Params(tuple(args),
kwargs=kwargs,
),
warnings_only=value['warnings_only'],
)
if key in self.attrib:
values = self.attrib[key]
if isinstance(values, list):
self.attrib[key] = []
for value in values:
if isinstance(value, dict):
value = build_calculation(value)
self.attrib[key].append(value)
else:
if isinstance(values, dict):
values = build_calculation(values)
self.attrib[key] = values
def add_information(self, key, value):
if key in self.informations:
raise CreoleLoaderError(_('key already exists in information {}').format(key))
self.informations[key] = value
def get(self):
if self.option is None:
if self.object_type is SymLinkOption:
self.attrib['opt'] = self.attrib['opt'].get()
else:
self.build_properties()
self.build_calculator('default')
self.build_calculator('validators')
if not self.attrib['validators']:
del self.attrib['validators']
try:
option = self.object_type(**self.attrib)
except Exception as err:
import traceback
traceback.print_exc()
name = self.attrib['name']
raise CreoleLoaderError(_('cannot create option "{}": {}').format(name, err))
for key, value in self.informations.items():
option.impl_set_information(key, value)
self.option = option
return self.option
class Family(Common):
def __init__(self,
elt,
booleans,
storage,
eosfunc,
is_leader=False,
):
self.option = None
self.attrib = {}
self.is_leader = False
self.informations = {}
self.children = []
self.storage = storage
self.eosfunc = eosfunc
self.attrib['properties'] = []
for key, value in elt.attrib.items():
if key == 'help':
self.add_information(key, value)
else:
self.attrib[key] = value
if not isinstance(elt, BaseElt) and len(elt) != 0:
for child in elt:
if child.tag == 'property':
self.populate_properties(child)
self.is_leader = is_leader
def add(self, child):
self.children.append(child)
def add_information(self, key, value):
if key in self.informations and not (key == 'icon' and self.informations[key] is None):
raise CreoleLoaderError(_('key already exists in information {}').format(key))
self.informations[key] = value
def get(self):
if self.option is None:
self.attrib['children'] = []
for child in self.children:
self.attrib['children'].append(child.get())
self.build_properties()
try:
if 'dynamic' in self.attrib:
dynamic = self.storage.get(self.attrib['dynamic']).get()
del self.attrib['dynamic']
self.attrib['suffixes'] = Calculation(self.eosfunc.calc_value,
Params((ParamOption(dynamic),)))
option = ConvertDynOptionDescription(**self.attrib)
elif not self.is_leader:
option = OptionDescription(**self.attrib)
else:
option = Leadership(**self.attrib)
except Exception as err:
print(self.attrib)
raise CreoleLoaderError(_('cannot create optiondescription "{}": {}').format(self.attrib['name'], err))
for key, value in self.informations.items():
option.impl_set_information(key, value)
self.option = option
return self.option
def load(xmlroot: str,
dtd_path: str,
funcs_path: str):
tiramisu_objects = PopulateTiramisuObjects()
tiramisu_objects.parse_dtd(dtd_path)
tiramisu_objects.make_tiramisu_objects(xmlroot,
funcs_path)
return tiramisu_objects.storage.paths['.'].get()