creole/creole/loader.py

752 lines
32 KiB
Python

"""creole loader
flattened XML specific
"""
from os.path import join, isfile, isdir
from os import listdir
#from ast import literal_eval
from lxml.etree import parse, DTD
from tiramisu.option import (UnicodeOption, OptionDescription, PortOption,
IntOption, ChoiceOption, BoolOption, SymLinkOption, IPOption,
NetworkOption, NetmaskOption, DomainnameOption, BroadcastOption,
URLOption, EmailOption, FilenameOption, UsernameOption, DateOption,
PasswordOption, BoolOption, MACOption, Leadership)
from tiramisu import Config, MetaConfig, MixConfig
from tiramisu.setting import groups
from tiramisu.error import ConfigError
from tiramisu.setting import owners
from tiramisu import Params, ParamOption, ParamValue, ParamContext, Calculation, calc_value
from .config import dtdfilename
from .i18n import _
#For compatibility
from .xmlreflector import HIGH_COMPATIBILITY
#from . import eosfunc
from .objspace import CreoleObjSpace
import imp
class CreoleLoaderError(Exception):
pass
def convert_tiramisu_value(value, obj):
"""
convertit les variables dans le bon type si nécessaire
"""
if value is None:
return value
def _convert_boolean(value):
if isinstance(value, bool):
return value
prop = {'True': True,
'False': False,
'None': None}
if value not in prop:
raise Exception('unknown value {} while trying to cast {} to boolean'.format(value, obj))
return prop[value]
func = {IntOption: int, UnicodeOption: str, PortOption: str,
DomainnameOption: str, EmailOption: str, URLOption: str,
IPOption: str, NetmaskOption: str, NetworkOption: str,
BroadcastOption: str, FilenameOption: str,
BoolOption: _convert_boolean}.get(obj, None)
if func is None:
return value
if isinstance(value, list):
return [func(val) for val in value]
else:
return func(value)
CONVERT_OPTION = {'number': dict(opttype=IntOption),
'choice': dict(opttype=ChoiceOption),
'string': dict(opttype=UnicodeOption),
'password': dict(opttype=PasswordOption),
'mail': dict(opttype=EmailOption),
'boolean': dict(opttype=BoolOption),
'symlink': dict(opttype=SymLinkOption),
'filename': dict(opttype=FilenameOption),
'date': dict(opttype=DateOption),
'unix_user': dict(opttype=UsernameOption),
'ip': dict(opttype=IPOption, initkwargs={'allow_reserved': True}),
'local_ip': dict(opttype=IPOption, initkwargs={'private_only': True, 'warnings_only': True}),
'netmask': dict(opttype=NetmaskOption),
'network': dict(opttype=NetworkOption),
'broadcast': dict(opttype=BroadcastOption),
'netbios': dict(opttype=DomainnameOption, initkwargs={'type': 'netbios', 'warnings_only': True}),
'domain': dict(opttype=DomainnameOption, initkwargs={'type': 'domainname', 'allow_ip': True, 'allow_without_dot': True}),
'domain_strict': dict(opttype=DomainnameOption, initkwargs={'type': 'domainname', 'allow_ip': False}),
'hostname': dict(opttype=DomainnameOption, initkwargs={'type': 'hostname', 'allow_ip': True}),
'hostname_strict': dict(opttype=DomainnameOption, initkwargs={'type': 'hostname', 'allow_ip': False}),
'web_address': dict(opttype=URLOption, initkwargs={'allow_ip': True, 'allow_without_dot': True}),
'port': dict(opttype=PortOption, initkwargs={'allow_private': True}),
'mac': dict(opttype=MACOption) # FIXME YO
}
# FIXME help
REMOVED_ATTRIB = ['path', 'type']
class Elt(object):
def __init__(self, attrib):
self.attrib = attrib
class PopulateTiramisuObjects(object):
def __init__(self):
self.storage = ElementStorage()
self.booleans = []
self.force_store_values = set()
self.separators = {}
self.groups = {}
def parse_dtd(self, dtdfilename):
"""Loads the Creole DTD
:raises IOError: if the DTD is not found
:param dtdfilename: the full filename of the Creole DTD
"""
if not isfile(dtdfilename):
raise IOError(_("no such DTD file: {}").format(dtdfilename))
with open(dtdfilename, 'r') as dtdfd:
dtd = DTD(dtdfd)
for elt in dtd.iterelements():
if elt.name == 'variable':
for attr in elt.iterattributes():
if set(attr.itervalues()) == set(['True', 'False']):
self.booleans.append(attr.name)
def make_tiramisu_objects(self, xmlroot, creolefunc_file, load_extra=True):
elt = Elt({'name': 'baseoption'})
family = Family(elt, self.booleans, self.storage)
self.storage.add('.', family)
self.eosfunc = imp.load_source('eosfunc', creolefunc_file)
elts = {}
for elt in xmlroot:
elts.setdefault(elt.tag, []).append(elt)
list_elts = list(elts.keys())
if 'family' in list_elts:
list_elts.remove('family')
list_elts.insert(0, 'family')
for elt in list_elts:
xmlelts_ = elts[elt]
if elt == 'family':
xmlelts = []
actions = None
# `creole` family has to be loaded before any other family
# because `extra` family could use `creole` variables.
# `actions` family has to be loaded at the very end
# because it may use `creole` or `extra` variables
for xml in xmlelts_:
if not load_extra and xml.attrib['name'] not in ['creole', 'containers']:
continue
if xml.attrib['name'] == 'creole':
xmlelts.insert(0, xml)
elif xml.attrib['name'] == 'actions':
actions = xml
else:
xmlelts.append(xml)
if actions is not None:
xmlelts.append(actions)
else:
xmlelts = xmlelts_
for xmlelt in xmlelts:
if xmlelt.tag == 'family':
self._iter_family(xmlelt, family)
elif xmlelt.tag == 'constraints':
self._iter_constraints(xmlelt, load_extra)
else:
raise CreoleLoaderError(_('unknown tag {}').format(xmlelt.tag))
def _populate_variable(self, elt, subpath, is_follower, is_leader):
variable = Variable(elt, self.booleans, self.storage, is_follower, is_leader, self.eosfunc)
path = self._build_path(subpath, elt)
properties = variable.attrib.get('properties', [])
if 'force_store_value' in properties or "auto_freeze" in properties:
self.force_store_values.add(path)
self.storage.add(path, variable)
return variable
def _populate_family(self, elt, subpath):
if subpath is None:
force_icon = False
else:
force_icon = not subpath.startswith('containers') and not subpath.startswith('actions')
family = Family(elt, self.booleans, self.storage, force_icon)
path = self._build_path(subpath, elt)
self.storage.add(path, family)
return family
def _build_path(self, subpath, elt):
if subpath is None:
subpath = elt.attrib['name']
else:
subpath += '.' + elt.attrib['name']
return subpath
def _iter_constraints(self, xmlelt, load_extra):
for elt in xmlelt:
if elt.tag == 'fill':
self._parse_fill(elt, load_extra)
elif elt.tag == 'check':
self._parse_check(elt, load_extra)
else:
raise CreoleLoaderError(_('unknown constraint {}').format(elt.tag))
def _check_extra(self, variable, load_extra):
if load_extra:
return True
return variable.startswith('creole.') or variable.startswith('containers.')
def _parse_fill(self, elt, load_extra):
if not self._check_extra(elt.attrib['target'], load_extra):
return
callback = getattr(self.eosfunc, elt.attrib['name'])
callback_params = {}
for param in elt:
name = param.attrib.get('name', '')
if param.attrib['type'] == 'string':
value = str(param.text)
elif param.attrib['type'] == 'eole':
hidden = param.attrib['hidden']
if hidden == 'True':
hidden = False
elif hidden == 'False':
hidden = True
else:
raise CreoleLoaderError(_('unknown hidden boolean {}').format(hidden))
if not self._check_extra(param.text, load_extra):
return
value = [self.storage.get(param.text), hidden]
elif param.attrib['type'] == 'number':
value = int(param.text)
elif param.attrib['type'] == 'context':
value = (None,)
else:
raise CreoleLoaderError(_('unknown param type {} in fill to {}').format(param.attrib['type'], elt.attrib['target']))
callback_params.setdefault(name, []).append(value)
if callback_params == {}:
callback_params = None
self.storage.add_callback(elt.attrib['target'], callback, callback_params)
def _parse_check(self, elt, load_extra):
if not self._check_extra(elt.attrib['target'], load_extra):
return
all_param_eole = True
for param in elt:
if param.attrib.get('type') != 'eole':
all_param_eole = False
break
if elt.attrib['name'] == 'valid_enum':
# only for valid_enum with checkval to True
if len(elt) != 1:
raise CreoleLoaderError(_('valid_enum cannot have more than one param for {}').format(elt.attrib['target']))
if elt[0].attrib['type'] == 'eole':
proposed = elt[0].text
type_ = 'eole'
else:
#proposed_value = literal_eval(elt[0].text)
proposed_value = eval(elt[0].text)
proposed = tuple(proposed_value)
type_ = 'string'
self.storage.add_information(elt.attrib['target'], 'proposed_value', {'value': proposed, 'type': type_})
validator = getattr(self.eosfunc, elt.attrib['name'])
elif elt.attrib['name'] == 'valid_differ' and all_param_eole:
if (HIGH_COMPATIBILITY and len(elt) not in [0, 1]) or (not HIGH_COMPATIBILITY and len(elt) != 1):
raise CreoleLoaderError(_('valid_differ length should be 1'))
if HIGH_COMPATIBILITY and len(elt) == 1:
if not self._check_extra(elt[0].text, load_extra):
return
variables = [self.storage.get(elt[0].text)]
else:
variables = []
self.storage.add_consistency(elt.attrib['target'],
'not_equal',
variables,
elt.attrib['warnings_only'],
elt.attrib['transitive'])
elif elt.attrib['name'] == 'valid_networknetmask':
if len(elt) != 1:
raise CreoleLoaderError(_('valid_networknetmask length should be 1'))
if not all_param_eole:
raise CreoleLoaderError(_('valid_networknetmask must have only eole variable'))
variables = [self.storage.get(elt[0].text)]
self.storage.add_consistency(elt.attrib['target'],
'network_netmask',
variables,
elt.attrib['warnings_only'],
elt.attrib['transitive'])
elif elt.attrib['name'] == 'valid_ipnetmask':
if len(elt) != 1:
raise CreoleLoaderError(_('valid_ipnetmask length should be 1'))
if not all_param_eole:
raise CreoleLoaderError(_('valid_ipnetmask must have only eole variable'))
if not self._check_extra(elt[0].text, load_extra):
return
variables = [self.storage.get(elt[0].text)]
self.storage.add_consistency(elt.attrib['target'],
'ip_netmask',
variables,
elt.attrib['warnings_only'],
elt.attrib['transitive'])
elif elt.attrib['name'] == 'valid_broadcast':
if len(elt) != 2:
raise CreoleLoaderError(_('valid_broadcast length should be 2'))
if not all_param_eole:
raise CreoleLoaderError(_('valid_broadcast must have only eole variable'))
if not self._check_extra(elt[0].text, load_extra):
return
variables = [self.storage.get(elt[0].text)]
if not self._check_extra(elt[1].text, load_extra):
return
variables.append(self.storage.get(elt[1].text))
self.storage.add_consistency(elt.attrib['target'],
'broadcast',
variables,
elt.attrib['warnings_only'],
elt.attrib['transitive'])
elif elt.attrib['name'] == 'valid_in_network':
if len(elt) != 2:
raise CreoleLoaderError(_('valid_in_network length should be 2'))
if not all_param_eole:
raise CreoleLoaderError(_('valid_in_network must have only eole variable'))
if not self._check_extra(elt[0].text, load_extra):
return
variables = [self.storage.get(elt[0].text)]
if not self._check_extra(elt[1].text, load_extra):
return
variables.append(self.storage.get(elt[1].text))
self.storage.add_consistency(elt.attrib['target'],
'in_network',
variables,
elt.attrib['warnings_only'],
elt.attrib['transitive'])
else:
validator = getattr(self.eosfunc, elt.attrib['name'])
validator_params = {}
for param in elt:
text = param.text
if param.attrib['type'] == 'eole':
hidden = param.attrib.get('hidden', 'True')
if hidden == 'True':
hidden = False
elif hidden == 'False':
hidden = True
else:
raise CreoleLoaderError(_('unknown hidden boolean {}').format(hidden))
if not self._check_extra(text, load_extra):
return
text = [self.storage.get(text), hidden]
validator_params.setdefault(param.attrib.get('name', ''), []).append(text)
self.storage.add_validator(elt.attrib['target'], validator, validator_params)
def _iter_leader(self, leader, subpath):
subpath = self._build_path(subpath, leader)
family = Family(leader, self.booleans, self.storage)
family.set_leader()
self.storage.add(subpath, family)
leader_name = None
for var in leader:
if var.tag == 'property':
self._parse_properties(family, var)
elif var.tag == 'variable':
if leader_name is None:
leader_name = var.attrib['name']
self.groups[leader_name] = []
else:
self.groups[leader_name].append(var.attrib['name'])
self._iter_family(var, family, subpath=subpath)
else:
raise CreoleLoaderError(_('unknown tag {}').format(var.tag))
return family
def _iter_family(self, child, family, subpath=None):
if child.tag not in ['family', 'variable', 'separators', 'leader', 'property']:
raise CreoleLoaderError(_('unknown tag {}').format(child.tag))
if child.tag == 'family':
old_family = family
family = self._populate_family(child, subpath)
if old_family is not None:
old_family.add(family)
if len(child) != 0:
subpath = self._build_path(subpath, child)
for c in child:
self._iter_family(c, family, subpath=subpath)
if child.tag == 'leader':
leader = self._iter_leader(child, subpath)
family.add(leader)
elif child.tag == 'separators':
self._parse_separators(child)
elif child.tag == 'variable':
if family is None:
raise CreoleLoaderError(_('variable without family'))
is_follower = False
is_leader = False
if family.is_leader:
if child.attrib['name'] != family.attrib['name']:
is_follower = True
else:
is_leader = True
variable = self._populate_variable(child, subpath, is_follower, is_leader)
family.add(variable)
elif child.tag == 'property':
self._parse_properties(family, child)
def _parse_properties(self, family, child):
if child.get('type') == 'calculation':
kwargs = {'condition': child.attrib['source'],
'expected': ParamValue(child.attrib.get('expected'))}
if child.attrib['inverse'] == 'True':
kwargs['reverse_condition'] = ParamValue(True)
family.attrib['properties'].append((ParamValue(child.text), kwargs))
else:
family.attrib['properties'].append(child.text)
def _parse_separators(self, separators):
for separator in separators:
elt = self.storage.get(separator.attrib['name'])
never_hidden = separator.attrib.get('never_hidden')
if never_hidden == 'True':
never_hidden = True
else:
never_hidden = None
info = (separator.text, never_hidden)
self.separators[separator.attrib['name']] = info
elt.add_information('separator', info)
def build(self, persistent=False, session_id=None, meta_config=False):
if meta_config:
optiondescription = self.storage.paths['.'].get()
config = MetaConfig([],
optiondescription=optiondescription,
persistent=persistent,
session_id=session_id)
mixconfig = MixConfig(children=[],
optiondescription=optiondescription,
persistent=persistent,
session_id='m_' + session_id)
config.config.add(mixconfig)
else:
config = Config(self.storage.paths['.'].get(),
persistent=persistent,
session_id=session_id)
config.information.set('force_store_vars', self.force_store_values)
config.information.set('force_store_values', list(self.force_store_values))
# XXX really usefull?
ro_append = frozenset(config.property.getdefault('read_only', 'append') - {'force_store_value'})
rw_append = frozenset(config.property.getdefault('read_write', 'append') - {'force_store_value'})
config.property.setdefault(ro_append, 'read_only', 'append')
config.property.setdefault(rw_append, 'read_write', 'append')
config.property.read_only()
config.permissive.add('basic')
config.permissive.add('normal')
config.permissive.add('expert')
return config
class ElementStorage:
def __init__(self):
self.paths = {}
def add(self, path, elt):
if path in self.paths:
raise CreoleLoaderError(_('path already loaded {}').format(path))
self.paths[path] = elt
def add_callback(self, path, callback, callback_params):
elt = self.get(path)
elt.add_callback(callback, callback_params)
def add_information(self, path, name, information):
elt = self.get(path)
elt.add_information(name, information)
def add_validator(self, path, validator, validator_params):
elt = self.get(path)
elt.add_validator(validator, validator_params)
def add_consistency(self, path, consistence, variables, warnings_only, transitive):
elt = self.get(path)
elt.add_consistency(consistence, variables, warnings_only, transitive)
def add_requires(self, path, requires):
elt = self.get(path)
elt.add_requires(requires)
def get(self, path):
if path not in self.paths:
raise CreoleLoaderError(_('there is no element for path {}').format(path))
return self.paths[path]
class Common:
def build_properties(self):
for index, prop in enumerate(self.attrib['properties']):
if isinstance(prop, tuple):
action, kwargs = prop
kwargs['condition'] = ParamOption(self.storage.get(kwargs['condition']).get(), todict=True)
prop = Calculation(calc_value,
Params(action,
kwargs=kwargs))
self.attrib['properties'][index] = prop
if self.attrib['properties']:
self.attrib['properties'] = tuple(self.attrib['properties'])
else:
del self.attrib['properties']
class Variable(Common):
def __init__(self, elt, booleans, storage, is_follower, is_leader, eosfunc):
self.option = None
self.informations = {}
self.attrib = {}
self.callbacks = []
self.requires = []
self.validator = None
self.consistencies = []
self.attrib['properties'] = []
self.eosfunc = eosfunc
self.storage = storage
for key, value in elt.attrib.items():
if key in REMOVED_ATTRIB:
continue
#if key != 'name':
# value = unicode(value)
if key in booleans:
if value == 'True':
value = True
elif value == 'False':
value = False
else:
raise CreoleLoaderError(_('unknown value {} for {}').format(value, key))
if key == 'help':
self.add_information(key, value)
else:
self.attrib[key] = value
convert_option = CONVERT_OPTION[elt.attrib['type']]
self.object_type = convert_option['opttype']
if elt.attrib['type'] == 'choice':
if self.attrib.get('choice'):
self.attrib['values'] = getattr(self.eosfunc, self.attrib.get('choice'))
else:
self.attrib['values'] = []
for child in elt:
if child.tag == 'choice':
value = child.text
if 'type' in child.attrib and child.attrib['type'] == 'number':
value = int(value)
if value is None:
value = u''
self.attrib['values'].append(value)
self.attrib['values'] = tuple(self.attrib['values'])
for child in elt:
if child.tag == 'property':
if child.get('type') == 'calculation':
kwargs = {'condition': child.attrib['source'],
'expected': ParamValue(child.attrib.get('expected'))}
if child.attrib['inverse'] == 'True':
kwargs['reverse_condition'] = ParamValue(True)
self.attrib['properties'].append((ParamValue(child.text), kwargs))
else:
self.attrib['properties'].append(child.text)
elif child.tag == 'value':
if "type" in child.attrib:
type_ = CONVERT_OPTION[child.attrib['type']]['opttype']
else:
type_ = self.object_type
if self.attrib['multi'] and not is_follower:
if 'default' not in self.attrib:
self.attrib['default'] = []
value = convert_tiramisu_value(child.text, type_)
self.attrib['default'].append(value)
if 'default_multi' not in self.attrib and not is_leader:
self.attrib['default_multi'] = value
else:
if 'default' in self.attrib:
raise CreoleLoaderError(_('default value already set for {}'
'').format(self.attrib['path']))
value = convert_tiramisu_value(child.text, type_)
if value is None: # and (elt.attrib['type'] != 'choice' or value not in self.attrib['values']):
value = u''
if is_follower:
self.attrib['default_multi'] = value
else:
self.attrib['default'] = value
if 'initkwargs' in convert_option:
self.attrib.update(convert_option['initkwargs'])
if elt.attrib['type'] == 'symlink':
del self.attrib['properties']
del self.attrib['multi']
self.attrib['opt'] = storage.get(self.attrib['opt'])
def add_information(self, key, value):
if key in self.informations:
raise CreoleLoaderError(_('key already exists in information {}').format(key))
self.informations[key] = value
def add_callback(self, callback, callback_params):
self.callbacks.append((callback, callback_params))
def add_requires(self, requires):
self.requires.extend(requires)
def add_validator(self, validator, validator_params):
self.validator = (validator, validator_params)
def add_consistency(self, consistence, variables, warnings_only, transitive):
self.consistencies.append((consistence, variables, warnings_only, transitive))
def build_params(self, params):
if params != None:
new_params = Params()
for key, values in params.items():
new_values = []
for value in values:
if isinstance(value, list):
# retrieve object
value = ParamOption(value[0].get(), value[1])
elif value == (None,):
value = ParamContext()
else:
value = ParamValue(value)
if key == '':
args = list(new_params.args)
args.append(value)
new_params.args = tuple(args)
else:
new_params.kwargs[key] = value
return new_params
return params
def get(self):
if self.option is None:
if self.object_type is SymLinkOption:
self.attrib['opt'] = self.attrib['opt'].get()
else:
self.build_properties()
#for callback, callback_params in self.callbacks:
# self.attrib['callback'] = callback
# self.attrib['callback_params'] = self.build_params(callback_params)
#for require in self.requires:
# if isinstance(require['option'], Variable):
# require['option'] = require['option'].get()
#if self.requires != []:
# self.attrib['requires'] = self.requires
#if self.validator:
# self.attrib['validator'] = self.validator[0]
# self.attrib['validator_params'] = self.build_params(self.validator[1])
try:
option = self.object_type(**self.attrib)
except Exception as err:
import traceback
traceback.print_exc()
name = self.attrib['name']
raise CreoleLoaderError(_('cannot create option {}: {}').format(name, err))
for key, value in self.informations.items():
option.impl_set_information(key, value)
#for consistency in self.consistencies:
# options = []
# for variable in consistency[1]:
# options.append(variable.get())
# try:
# kwargs = {}
# if consistency[2] == 'True':
# kwargs['warnings_only'] = True
# if consistency[3] == 'False':
# kwargs['transitive'] = False
# option.impl_add_consistency(consistency[0], *options, **kwargs)
# except ConfigError as err:
# name = self.attrib['name']
# raise CreoleLoaderError(_('cannot load consistency for {}: {}').format(name, err))
self.option = option
return self.option
class Family(Common):
def __init__(self, elt, booleans, storage, force_icon=False):
self.requires = []
self.option = None
self.attrib = {}
self.is_leader = False
if force_icon:
self.informations = {'icon': None}
else:
self.informations = {}
self.children = []
self.storage = storage
self.attrib['properties'] = []
for key, value in elt.attrib.items():
if key in REMOVED_ATTRIB:
continue
if key in booleans:
if value == 'True':
value = True
elif value == 'False':
value = False
else:
raise CreoleLoaderError(_('unknown value {} for {}').format(value, key))
if key == 'icon':
self.add_information('icon', value)
continue
elif key == 'hidden':
if value:
self.attrib['properties'].append(key)
elif key == 'mode':
self.attrib['properties'].append(value)
elif key == 'help':
self.add_information(key, value)
else:
self.attrib[key] = value
if 'doc' not in self.attrib:
self.attrib['doc'] = u''
def add(self, child):
self.children.append(child)
def add_information(self, key, value):
if key in self.informations and not (key == 'icon' and self.informations[key] is None):
raise CreoleLoaderError(_('key already exists in information {}').format(key))
self.informations[key] = value
def set_leader(self):
self.is_leader = True
def add_requires(self, requires):
self.requires.extend(requires)
def get(self):
if self.option is None:
self.attrib['children'] = []
for child in self.children:
self.attrib['children'].append(child.get())
for require in self.requires:
if isinstance(require['option'], Variable):
require['option'] = require['option'].get()
if self.requires != []:
self.attrib['requires'] = self.requires
self.build_properties()
try:
if not self.is_leader:
option = OptionDescription(**self.attrib)
else:
option = Leadership(**self.attrib)
#option = OptionDescription(**self.attrib)
except Exception as err:
raise CreoleLoaderError(_('cannot create optiondescription {}: {}').format(self.attrib['name'], err))
for key, value in self.informations.items():
option.impl_set_information(key, value)
self.option = option
#if self.is_leader:
# self.option.impl_set_group_type(groups.leader)
return self.option