455 lines
16 KiB
Python
455 lines
16 KiB
Python
# -*- coding: utf-8 -*-
|
|
"""
|
|
Parseur LXML des fichiers XML de collecte des variables EOLE
|
|
"""
|
|
from lxml import etree
|
|
from copy import copy
|
|
from .error import ConfigError
|
|
from .utils import string_to_bool #, get_text_node
|
|
from .config import VIRTMASTER
|
|
from .dtd_parser import CONVERT_VALUE
|
|
from pyeole.odict import OrderedDict
|
|
|
|
from .i18n import _
|
|
|
|
def parse_xml_file(filename, dtd, parse_all=True, test_duplicate=False):
|
|
"""
|
|
@param filename: nom du fichier xml source
|
|
@return: structure de données permettant de créer les objets Eole
|
|
"""
|
|
try:
|
|
document = etree.iterparse(filename, events=('end',), tag='creole')
|
|
return _parse_root_node(document, dtd, parse_all, test_duplicate)
|
|
except Exception as err:
|
|
raise ConfigError(_(u"Error while parsing file {0}: {1}").format(filename, err))
|
|
|
|
def parse_string(xml_string, dtd, parse_all=True, test_duplicate=False):
|
|
"""
|
|
@param xml_string: dictionnaire xml sous forme de chaîne
|
|
@return: structure de données permettant de créer les objets Eole
|
|
"""
|
|
try:
|
|
root_node = etree.fromstring(xml_string)
|
|
document = etree.iterwalk(root_node, events=('end',), tag='creole')
|
|
return _parse_root_node(document, dtd, parse_all, test_duplicate)
|
|
except Exception as err:
|
|
raise ConfigError(_(u"Error while parsing: {0}").format(err))
|
|
|
|
def _parse_root_node(document, dtd, parse_all, test_duplicate=False):
|
|
"""
|
|
@param document: le noeud XML racine
|
|
"""
|
|
def _parse_container(node, options, container_name):
|
|
for name in options:
|
|
key_name = '{0}s'.format(name)
|
|
ret.setdefault(key_name, [])
|
|
values = parse_generic(node.findall(name),
|
|
container_name, dtd, name)
|
|
if values != []:
|
|
ret[key_name].extend(values)
|
|
|
|
for unused, first_node in document:
|
|
root_node = first_node
|
|
|
|
#verifie les doublons de variable dans le meme dico
|
|
if test_duplicate:
|
|
all_var_dict = []
|
|
for var in root_node.findall('variables/family/variable'):
|
|
name = var.attrib['name']
|
|
if name in all_var_dict:
|
|
raise ConfigError(_(u'Error, var {0} already exists in current dictionaries').format(name))
|
|
all_var_dict.append(name)
|
|
|
|
ret = {'families': parse_families(root_node)}
|
|
families_action = parse_actions(root_node, dtd)
|
|
if len(families_action) != 0:
|
|
ret['families_action'] = families_action
|
|
|
|
ret['containers'] = []
|
|
## balise <files> (données sur le maître)
|
|
file_node = root_node.findall('files')
|
|
if file_node != []:
|
|
if len(file_node) != 1:
|
|
raise Exception(_(u"Error: extra <files> tags in dictionaries."))
|
|
if parse_all:
|
|
_parse_container(file_node[0], dtd['files']['options'], VIRTMASTER)
|
|
ret['containers'].append({'name': VIRTMASTER, 'id': '1'})
|
|
|
|
## balise <containers> (données dans les conteneurs)
|
|
containers_node = root_node.findall('containers')
|
|
if containers_node != []:
|
|
if len(containers_node) != 1:
|
|
raise Exception(_(u"Error: extra <containers> tags in dictionaries."))
|
|
container = containers_node[0]
|
|
for container_node in container.getiterator('container'):
|
|
name = container_node.attrib['name']
|
|
if name in [VIRTMASTER, 'all']:
|
|
raise Exception(_(u"Name '{0}' is not allowed in tag <container>.").format(name))
|
|
if name in ret['containers']:
|
|
raise Exception(
|
|
_(u"There must be only one name '{0}' in a dictionary.").format(name))
|
|
containerid = _get_optional(container_node, 'id')
|
|
groupid = _get_optional(container_node, 'group')
|
|
ret['containers'].append({'name': name, 'id': containerid,
|
|
'group': groupid})
|
|
if parse_all:
|
|
_parse_container(container_node, dtd['container']['options'], name)
|
|
if parse_all:
|
|
all_node = container.findall('all')
|
|
if all_node != []:
|
|
if len(all_node) != 1:
|
|
raise Exception(_(u"Error: extra <all> tags in dictionaries."))
|
|
ret['containers'].append({'name': 'all'})
|
|
_parse_container(all_node[0], dtd['all']['options'], 'all')
|
|
|
|
## gestion des contraintes
|
|
#FIXME
|
|
ret.update(parse_constraints(root_node))
|
|
|
|
## gestion des groupes de variables
|
|
ret['groups'] = parse_groups(root_node)
|
|
|
|
## gestion de l'aide
|
|
ret['helps'] = parse_help(root_node)
|
|
|
|
## gestion des séparateurs
|
|
ret['separators'] = parse_separators(root_node)
|
|
return ret
|
|
|
|
|
|
def _get_boolean_attr(node, attr_name, default=False):
|
|
"""
|
|
Gestion spécifique pour les attributs booléens
|
|
Ils sont à False par défaut
|
|
"""
|
|
val = node.get(attr_name)
|
|
if default:
|
|
return str(val).lower() != 'false'
|
|
elif val is None:
|
|
return None
|
|
else:
|
|
return str(val).lower() == 'true'
|
|
|
|
|
|
def _get_optional(node, attr_name):
|
|
"""
|
|
Valeur d'un attribut optionnel
|
|
"""
|
|
return node.get(attr_name)
|
|
|
|
|
|
def _parse_value(varnode, attr='value'):
|
|
"""
|
|
récupération des valeurs d'une variable
|
|
"""
|
|
res = []
|
|
for val in varnode.findall(attr):
|
|
# FIX for <value></value> !
|
|
if val.text is not None:
|
|
res.append(val.text)
|
|
else:
|
|
res.append('')
|
|
return res
|
|
|
|
def parse_value(varnode, name):
|
|
"""
|
|
récupération des valeurs d'une variable
|
|
"""
|
|
res = None
|
|
for val in varnode.findall('value'):
|
|
if val.text is not None:
|
|
tval = val.text
|
|
if res != None:
|
|
#str to list
|
|
if type(res) == str:
|
|
res = [res]
|
|
res.append(tval)
|
|
else:
|
|
res = tval
|
|
return res
|
|
|
|
def parse_generic(nodes, container, dtd, name, old_result=None):
|
|
ret = []
|
|
keys = dtd[name]
|
|
for node in nodes:
|
|
if old_result:
|
|
result = copy(old_result)
|
|
result['node_name'] = name
|
|
elif container is not None:
|
|
result = {'container': container}
|
|
else:
|
|
result = {}
|
|
if keys['type']:
|
|
if 'name' in keys['needs'] or 'name' in keys['optionals']:
|
|
raise Exception('PCDATA + name')
|
|
result['name'] = node.text
|
|
for key, values in keys['needs'].items():
|
|
value = node.attrib[key]
|
|
value = CONVERT_VALUE.get(value, value)
|
|
if values['values'] is not None and value not in values['values']:
|
|
raise Exception(_(u"Value {0} not in {1}").format(value, values['values']))
|
|
result[key] = value
|
|
for key, values in keys['optionals'].items():
|
|
value = node.attrib.get(key, values['default'])
|
|
value = CONVERT_VALUE.get(value, value)
|
|
if value != None:
|
|
if values['values'] is not None and value not in values['values']:
|
|
raise Exception(_(u"Value {0} not in {1}").format(value, values['values']))
|
|
result[key] = value
|
|
if keys['options'] == []:
|
|
ret.append(result)
|
|
else:
|
|
for option in keys['options']:
|
|
ret.extend(parse_generic(node.findall(option), container, dtd, option, result))
|
|
return ret
|
|
|
|
|
|
def parse_variables(var_node):
|
|
"""
|
|
traitement des variables
|
|
@param var_node: noeud <variables>
|
|
"""
|
|
result = OrderedDict()
|
|
for var in var_node.getiterator('variable'):
|
|
# Default variables are handled in creole.loader
|
|
hidden = _get_boolean_attr(var, 'hidden')
|
|
multi = _get_boolean_attr(var, 'multi')
|
|
redefine = _get_boolean_attr(var, 'redefine')
|
|
mandatory = _get_boolean_attr(var, 'mandatory')
|
|
remove_check = _get_boolean_attr(var, 'remove_check')
|
|
remove_condition = _get_boolean_attr(var, 'remove_condition')
|
|
exists = _get_boolean_attr(var, 'exists', default=True)
|
|
disabled = _get_boolean_attr(var, 'disabled', default=False)
|
|
auto_freeze = _get_boolean_attr(var, 'auto_freeze')
|
|
auto_save = _get_boolean_attr(var, 'auto_save')
|
|
mode = _get_optional(var, 'mode')
|
|
name = var.attrib['name']
|
|
value = parse_value(var, var.attrib['name'])
|
|
typ = _get_optional(var, 'type')
|
|
if typ == None:
|
|
typ = 'string'
|
|
desc = _get_optional(var, 'description')
|
|
if type(desc) == unicode:
|
|
desc = desc.encode('utf-8')
|
|
result[name] = dict(value=value,
|
|
type=typ,
|
|
description=desc,
|
|
hidden=hidden,
|
|
multi=multi,
|
|
auto='',
|
|
redefine=redefine,
|
|
exists=exists,
|
|
auto_freeze=auto_freeze,
|
|
auto_save=auto_save,
|
|
mode=mode,
|
|
mandatory=mandatory,
|
|
disabled=disabled,
|
|
remove_check=remove_check,
|
|
remove_condition=remove_condition
|
|
)
|
|
return result
|
|
|
|
def parse_families(var_node):
|
|
"""
|
|
traitement des familles
|
|
@param var_node: noeud <variables>
|
|
"""
|
|
result = OrderedDict()
|
|
for family in var_node.findall('variables/family'): #: getiterator('family'):
|
|
family_name = family.attrib['name']
|
|
if family_name in result:
|
|
raise Exception(_(u"Family {0} is set several times.").format(family_name))
|
|
hidden = _get_boolean_attr(family, 'hidden')
|
|
# FIXME: mode='' était admis avec domparser
|
|
mode = _get_optional(family, 'mode')
|
|
icon = _get_optional(family, 'icon')
|
|
variables = parse_variables(family)
|
|
result[family_name] = {'hidden': hidden,
|
|
'mode': mode,
|
|
'vars': variables,
|
|
'icon': icon
|
|
}
|
|
return result
|
|
|
|
|
|
def parse_actions(root_node, dtd):
|
|
"""
|
|
traitement des familles
|
|
@param var_node: noeud <variables>
|
|
"""
|
|
result = OrderedDict()
|
|
def _parse_action(node, options):
|
|
parse = {}
|
|
for name in options:
|
|
key_name = '{0}'.format(name)
|
|
parse.setdefault(key_name, [])
|
|
values = parse_generic(node.findall(name), None, dtd, name)
|
|
if values != []:
|
|
parse[key_name].extend(values)
|
|
parse['type'] = node.get("type", "custom")
|
|
parse['title'] = node.get('title')
|
|
parse['description'] = node.get('description')
|
|
image = node.get('image')
|
|
if image:
|
|
parse['image'] = image
|
|
url = node.get('url', None)
|
|
if url:
|
|
parse['url'] = url
|
|
return parse
|
|
|
|
for family in root_node.findall('family_action'): #: getiterator('family'):
|
|
family_name = family.attrib['name']
|
|
if family_name in result:
|
|
raise Exception(_(u"Action Family {0} is set several times.").format(family_name))
|
|
description = _get_optional(family, 'description')
|
|
color = _get_optional(family, 'color')
|
|
image = _get_optional(family, 'image')
|
|
## balise <action>
|
|
action_node = family.findall('action')
|
|
if action_node != [] and len(action_node) != 1:
|
|
raise Exception(_(u"Error: extra <action> tags in dictionaries."))
|
|
action = _parse_action(action_node[0], dtd['action']['options'])
|
|
result[family_name] = {'name': family_name,
|
|
'description': description,
|
|
'color': color,
|
|
'image': image,
|
|
'action': action
|
|
}
|
|
return result
|
|
|
|
def parse_constraints(node):
|
|
"""
|
|
@param node: node des contraintes
|
|
"""
|
|
constraints = {'checks' : parse_funcs(node,'check'),
|
|
'fills' : parse_funcs(node,'fill'),
|
|
'autos' : parse_funcs(node,'auto'),
|
|
'conditions' : parse_conditions(node)
|
|
}
|
|
return constraints
|
|
|
|
|
|
def _parse_param(param_node):
|
|
"""
|
|
traitement des paramètres d'une fonction
|
|
"""
|
|
return {'name' : _get_optional(param_node, 'name'),
|
|
'type' : _get_optional(param_node, 'type'),
|
|
'value' : param_node.text,
|
|
'optional' : _get_optional(param_node, 'optional'),
|
|
'hidden' : _get_optional(param_node, 'hidden'),
|
|
}
|
|
|
|
|
|
def parse_funcs(node, func_type):
|
|
"""
|
|
@param node: node des fonctions
|
|
@param func_type: TagName of the functions to find
|
|
@return: {target: [(param_name, _parse_params('param'))]}
|
|
"""
|
|
# fonctions de vérification
|
|
funcs = {}
|
|
for func in node.findall('constraints/%s' % func_type):
|
|
# lecture des paramètres
|
|
params = []
|
|
#si balise <target>
|
|
targets = _parse_value(func, 'target')
|
|
#sinon c'est un attribut target=
|
|
if not targets:
|
|
#met dans une liste parce que <target> retourne une liste
|
|
targets = [_get_optional(func, 'target')]
|
|
level = _get_optional(func, 'level')
|
|
if not level:
|
|
level = 'error'
|
|
for target in targets:
|
|
if target is not None:
|
|
for param in func.getiterator('param'):
|
|
params.append(_parse_param(param))
|
|
funcs.setdefault(target, []).append((func.attrib['name'],
|
|
params, level))
|
|
return funcs
|
|
|
|
|
|
def parse_conditions(node):
|
|
"""
|
|
@param node: node des fonctions
|
|
"""
|
|
# fonctions de vérification
|
|
funcs = {}
|
|
for func in node.getiterator('condition'):
|
|
# lecture des paramètres
|
|
targets = []
|
|
family_targets = []
|
|
list_targets = []
|
|
# paramètres de la fonction
|
|
params = [_parse_param(param)
|
|
for param in func.getiterator('param')]
|
|
# cibles de la dépendance
|
|
for target in func.getiterator('target'):
|
|
ttype = target.get('type')
|
|
optional = target.get('optional', False)
|
|
if ttype == 'family':
|
|
family_targets.append((target.text, optional))
|
|
elif ttype in ['variable', None]:
|
|
targets.append((target.text, optional))
|
|
else:
|
|
if ttype.endswith('list'):
|
|
#suppress list in ttype
|
|
list_targets.append((ttype[:-4], target.text, optional))
|
|
else:
|
|
raise Exception(_(u'Unknown type {0} for condition target.').format(ttype))
|
|
funcdef = {'name': func.attrib['name'], 'family': family_targets,
|
|
'variable': targets, 'list': list_targets, 'param': params,
|
|
'fallback': _get_boolean_attr(func, 'fallback')}
|
|
source = _get_optional(func, 'source')
|
|
if source == None:
|
|
raise Exception(_(u'Impossible condition without source for {0}.').format(funcdef))
|
|
funcs.setdefault(source, []).append(funcdef)
|
|
return funcs
|
|
|
|
|
|
def parse_groups(node):
|
|
"""
|
|
Traitement des groupes de variables
|
|
"""
|
|
result = {}
|
|
for group in node.findall('constraints/group'):
|
|
slaves = _parse_value(group, 'slave')
|
|
result[group.attrib['master']] = slaves
|
|
return result
|
|
|
|
|
|
def parse_help(node):
|
|
"""
|
|
Traitement de l'aide
|
|
"""
|
|
var_help = {}
|
|
for var in node.findall('help/variable'):
|
|
name = var.attrib['name']
|
|
try:
|
|
var_help[name] = var.text.strip()
|
|
except AttributeError:
|
|
raise Exception(_(u"Invalid help for variable {0}.").format(name))
|
|
fam_help = {}
|
|
for var in node.findall('help/family'):
|
|
name = var.attrib['name']
|
|
try:
|
|
fam_help[name] = var.text.strip()
|
|
except AttributeError:
|
|
raise Exception(_(u"Invalid help for family {0}").format(name))
|
|
return {'variables':var_help, 'families': fam_help}
|
|
|
|
|
|
def parse_separators(node):
|
|
"""dictionnaire des séparateurs, format {'variable':'text'}
|
|
variable : nom de la première variable après le sépateur"""
|
|
var_sep = {}
|
|
for var in node.findall('variables/separators/separator'):
|
|
if not var.text:
|
|
libelle = ''
|
|
else:
|
|
libelle = var.text.strip()
|
|
var_sep[var.attrib['name']] = (libelle, _get_boolean_attr(var, 'never_hidden'))
|
|
return var_sep
|
|
|