python3 and simplify import

This commit is contained in:
2019-12-02 10:31:55 +01:00
parent 5e3ff68325
commit 498e950610
31 changed files with 465 additions and 3433 deletions

4
src/rougail/__init__.py Normal file
View File

@ -0,0 +1,4 @@
from .loader import load
from .annotator import modes
__ALL__ = ('load', 'modes')

1474
src/rougail/annotator.py Normal file
View File

@ -0,0 +1,1474 @@
# coding: utf-8
from copy import copy
from collections import OrderedDict
from os.path import join, basename
from ast import literal_eval
import imp
from .i18n import _
from .utils import normalize_family
from .error import CreoleDictConsistencyError
from .xmlreflector import HIGH_COMPATIBILITY
#mode order is important
modes_level = ('basic', 'normal', 'expert')
class Mode(object):
def __init__(self, name, level):
self.name = name
self.level = level
def __cmp__(self, other):
return cmp(self.level, other.level)
def __eq__(self, other):
return self.level == other.level
def __ne__(self, other):
return self.level != other.level
def __gt__(self, other):
return other.level < self.level
def __ge__(self, other):
return not self.level < other.level
def __le__(self, other):
return not other.level < self.level
def mode_factory():
mode_obj = {}
for idx in range(len(modes_level)):
name = modes_level[idx]
mode_obj[name] = Mode(name, idx)
return mode_obj
modes = mode_factory()
# a CreoleObjSpace's attribute has some annotations
# that shall not be present in the exported (flatened) XML
ERASED_ATTRIBUTES = ('redefine', 'exists', 'fallback', 'optional', 'remove_check', 'namespace',
'remove_condition', 'path', 'instance_mode', 'index', 'is_in_leadership',
'level') # , '_real_container')
ERASED_CONTAINER_ATTRIBUTES = ('id', 'container', 'group_id', 'group', 'container_group')
NOT_NEED_ACTIVATE = ('disknod',)
FORCE_CHOICE = {'oui/non': ['oui', 'non'],
'on/off': ['on', 'off'],
'yes/no': ['yes', 'no'],
'schedule': ['none', 'daily', 'weekly', 'monthly'],
'schedulemod': ['pre', 'post']}
KEY_TYPE = {'SymLinkOption': 'symlink',
'PortOption': 'port',
'UnicodeOption': 'string',
'NetworkOption': 'network',
'NetmaskOption': 'netmask',
'URLOption': 'web_address',
'FilenameOption': 'filename'}
TYPE_PARAM_CHECK = ('string', 'python', 'eole')
TYPE_PARAM_CONDITION = ('string', 'python', 'number', 'eole')
TYPE_PARAM_FILL = ('string', 'eole', 'number', 'context')
DISKNOD_KEY_TYPE = {'major': 'number',
'minor': 'number'}
ERASED_FAMILY_ACTION_ATTRIBUTES = ('index', 'action')
FREEZE_AUTOFREEZE_VARIABLE = 'module_instancie'
class ContainerAnnotator:
"""Manage container's object
"""
def __init__(self, objectspace):
self.space = objectspace.space
self.paths = objectspace.paths
self.objectspace = objectspace
"""for example::
<service_access service='ntp'>
<port protocol='udp' service_accesslist='ntp_udp'>123</port>
<tcpwrapper>ntpd</tcpwrapper>
</service_access>
"""
self.grouplist_conditions = {}
self.convert_containers()
def convert_containers(self):
if hasattr(self.space, 'containers'):
if hasattr(self.space.containers, 'container'):
self.convert_all()
subelts = dict()
# self.space.containers.containers = self.objectspace.containers()
for idx, container in enumerate(self.space.containers.container.values()):
family = self.objectspace.family()
family.name = 'container{}'.format(idx)
family.doc = container.name
family.family = OrderedDict()
self.convert_container_to_family(family.name, family.family, container)
setattr(self.space.containers, family.name, family)
del self.space.containers.container
else:
del self.space.containers
def convert_all(self):
if hasattr(self.space.containers, 'all'):
# Remove "all" and dispatch informations in all containers
for type_, containers in vars(self.space.containers.all).items():
if type_ == 'index':
continue
if isinstance(containers, list):
for elt in containers:
for container in self.space.containers.container.values():
if container.name != 'all':
if not hasattr(container, type_):
setattr(container, type_, [])
new_elt = copy(elt)
new_elt.container = container
getattr(container, type_).append(new_elt)
else:
for name, elt in containers.items():
for container in self.space.containers.container.values():
if container.name != 'all':
if not hasattr(container, type_):
setattr(container, type_, OrderedDict())
old_container = getattr(container, type_)
if name in old_container:
raise CreoleDictConsistencyError('{}'.format(name))
new_elt = copy(elt)
new_elt.container = container
old_container[name] = new_elt
del self.space.containers.all
def convert_container_to_family(self, container_name, container_family, container):
# tranform container object to family object
# add services, service_accesses, ...
for elttype in self.objectspace.container_elt_attr_list:
if hasattr(container, elttype):
family = self.objectspace.family()
key_type_name = elttype.upper() + '_KEY_TYPE'
if key_type_name in globals():
key_type = globals()[key_type_name]
else:
key_type = {}
if elttype.endswith('s'):
family.name = elttype + 'es'
else:
family.name = elttype + 's'
values = getattr(container, elttype)
if isinstance(values, dict):
values = list(values.values())
family.family = self.make_group_from_elts(elttype,
values,
key_type,
'containers.{}.{}'.format(container_name, family.name),
True)
family.mode = None
container_family[family.name] = family
def _generate_element(self, eltname, name, value, type_, subpath, multi=False):
var_data = {'name': name, 'doc': '', 'value': value,
'auto_freeze': False, 'mode': None, 'multi': multi}
values = None
if type_ == 'string':
values = self.objectspace.forced_choice_option.get(eltname, {}).get(name)
if values is not None:
type_ = 'choice'
var_data['type'] = type_
variable = self.objectspace.variable()
if not HIGH_COMPATIBILITY:
variable.mandatory = True
for key, value in var_data.items():
if key == 'value':
if value is None:
continue
if type_ == 'symlink':
key = 'opt'
else:
# Value is a list of objects
if not multi:
val = self.objectspace.value()
val.name = value
value = [val]
else:
value_list = []
for valiter in value:
val = self.objectspace.value()
val.name = valiter.name
value_list.append(val)
value = value_list
if key == 'doc' and type_ == 'symlink':
continue
setattr(variable, key, value)
if values is not None:
choices = []
for value in values:
choice = self.objectspace.choice()
choice.name = value
choices.append(choice)
variable.choice = choices
path = '{}.{}'.format(subpath, name)
self.paths.append('variable', path, 'containers', 'containers', variable)
return variable
def _make_disknod_auto(self, type_, index, variable, container_path):
if not hasattr(self.space.constraints, 'auto'):
self.space.constraints.auto = []
auto = self.objectspace.auto()
self.objectspace.index += 1
auto.index = self.objectspace.index
auto.namespace = 'containers'
param1 = self.objectspace.param()
param1.text = type_
param2 = self.objectspace.param()
param2.text = variable.name
auto.param = [param1, param2]
auto.name = 'cdrom_minormajor'
family = 'disknod{}'.format(index)
auto.target = '{}.{}.{}'.format(container_path, family, type_)
if not hasattr(self.space, 'constraints'):
self.space.constraints = self.objectspace.constraints()
self.space.constraints.auto.append(auto)
def _make_disknod_type(self, index, variable, container_path):
auto = self.objectspace.auto()
self.objectspace.index += 1
auto.index = self.objectspace.index
auto.namespace = 'containers'
param = self.objectspace.param()
param.text = variable.name
auto.param = [param]
auto.name = 'device_type'
family = 'disknod{}'.format(index)
auto.target = '{}.{}.type'.format(container_path, family)
if not hasattr(self.space, 'constraints'):
self.space.constraints = self.objectspace.constraints()
if not hasattr(self.space.constraints, 'auto'):
self.space.constraints.auto = []
self.space.constraints.auto.append(auto)
def _update_disknod(self, disknod, index, container_path):
disknod.major = None
disknod.minor = None
disknod.type = None
self._make_disknod_auto('minor', index, disknod, container_path)
self._make_disknod_auto('major', index, disknod, container_path)
self._make_disknod_type(index, disknod, container_path)
disknod.mode = u'rwm'
disknod.permission = 'allow'
def _update_file(self, file_, index, container_path):
if not hasattr(file_, 'source'):
file_.source = basename(file_.name)
def _split_elts(self, name, key, value, elt):
"""for example::
<service_access service='ntp'>
<port protocol='udp' service_accesslist='ntp_udp'>123</port>
<tcpwrapper>ntpd</tcpwrapper>
</service_access>
builds a `service_access` object, but we need **two** objects `service_access`,
for example one for the port and one for the tcpwrapper
"""
for subelt in value:
new_elt = copy(elt)
for subsubelt in dir(subelt):
if subsubelt.startswith('_') or subsubelt == 'index':
continue
if hasattr(new_elt, subsubelt):
if hasattr(elt, 'name'):
name_ = elt.name
else:
name_ = elt.service
raise CreoleDictConsistencyError(_('attribute {} already exists '
'for {}').format(subsubelt,
name_))
setattr(new_elt, subsubelt, getattr(subelt, subsubelt))
if hasattr(new_elt, 'node_name') or hasattr(new_elt, 'name_type'):
raise CreoleDictConsistencyError(_('attribute node_name or name_type '
'already exists for {}'
'').format(name))
if hasattr(subelt, key + '_type'):
type_ = getattr(subelt, key + '_type')
setattr(new_elt, 'name_type', type_)
setattr(new_elt, 'node_name', key)
if not hasattr(new_elt, name + 'list'):
setattr(new_elt, name + 'list', '___auto_{}'.format(elt.service))
else:
self.grouplist_conditions[new_elt] = '___auto_{}'.format(elt.service)
yield new_elt
def _reorder_elts(self, name, elts, duplicate_list):
"""Reorders by index the elts (the interface,
the hosts, actions...)
"""
dict_elts = OrderedDict()
# reorder elts by index
new_elts = {}
not_indexed = []
for elt in elts:
if not hasattr(elt, 'index'):
not_indexed.append(elt)
else:
idx = elt.index
new_elts.setdefault(idx, []).append(elt)
idxes = list(new_elts.keys())
idxes.sort()
elts = not_indexed
for idx in idxes:
elts.extend(new_elts[idx])
for idx, elt in enumerate(elts):
elt_added = False
for key in dir(elt):
if key.startswith('_') or key.endswith('_type') or key in ERASED_ATTRIBUTES:
continue
value = getattr(elt, key)
if isinstance(value, list) and duplicate_list:
for new_elt in self._split_elts(name, key, value, elt):
dict_elts.setdefault(new_elt.name, []).append({'elt_name': key,
'elt': new_elt})
elt_added = True
if not elt_added:
if hasattr(elt, 'name'):
eltname = elt.name
else:
eltname = idx
dict_elts.setdefault(eltname, []).append({'elt_name': name, 'elt': elt})
result_elts = []
for elt in dict_elts.values():
result_elts.extend(elt)
return result_elts
def make_group_from_elts(self, name, elts, key_type, path, duplicate_list):
"""Splits each objects into a group (and `OptionDescription`, in tiramisu terms)
and build elements and its attributes (the `Options` in tiramisu terms)
"""
families = []
new_elts = self._reorder_elts(name, elts, duplicate_list)
for index, elt_info in enumerate(new_elts):
elt = elt_info['elt']
elt_name = elt_info['elt_name']
# try to launch _update_xxxx() function
update_elt = '_update_' + elt_name
if hasattr(self, update_elt):
getattr(self, update_elt)(elt, index, path)
variables = []
subpath = '{}.{}{}'.format(path, name, index)
listname = '{}list'.format(name)
activate_path = '.'.join([subpath, 'activate'])
if name not in NOT_NEED_ACTIVATE and elt in self.grouplist_conditions:
# FIXME transformer le activate qui disparait en boolean
self.objectspace.list_conditions.setdefault(listname,
{}).setdefault(self.grouplist_conditions[elt],
[]).append(activate_path)
for key in dir(elt):
if key.startswith('_') or key.endswith('_type') or key in ERASED_ATTRIBUTES:
continue
value = getattr(elt, key)
if isinstance(value, list) and duplicate_list:
continue
if key == 'container':
value = value.name
if name not in NOT_NEED_ACTIVATE and key == listname:
self.objectspace.list_conditions.setdefault(listname,
{}).setdefault(
value,
[]).append(activate_path)
continue
default_type = 'string'
if key in self.objectspace.booleans_attributs:
default_type = 'boolean'
type_ = key_type.get(key, default_type)
dtd_key_type = key + '_type'
if hasattr(elt, dtd_key_type):
type_ = KEY_TYPE[getattr(elt, dtd_key_type)]
multi = isinstance(value, list)
variables.append(self._generate_element(elt_name,
key,
value,
type_,
subpath,
multi))
if name not in NOT_NEED_ACTIVATE:
# FIXME ne devrait pas etre True par défaut
variables.append(self._generate_element(name, 'activate', True, 'boolean', subpath))
family = self.objectspace.family()
family.name = '{}{}'.format(name, index)
family.variable = variables
family.mode = None
self.paths.append('family', subpath, 'containers', creoleobj=family)
families.append(family)
return families
class ActionAnnotator(ContainerAnnotator):
def __init__(self, objectspace):
self.space = objectspace.space
self.paths = objectspace.paths
self.objectspace = objectspace
self.grouplist_conditions = {}
self.convert_family_action()
def convert_family_action(self):
if hasattr(self.space, 'family_action'):
actions = self.objectspace.family()
actions.name = 'actions'
actions.mode = None
actions.family = []
self.space.actions = actions
namespaces = []
for name, actions in self.space.family_action.items():
subpath = 'actions.{}'.format(normalize_family(name))
for action in actions.action:
namespace = action.namespace
if namespace in namespaces:
raise CreoleDictConsistencyError(_('only one action allow for {}'
'').format(namespace))
namespaces.append(namespace)
action.name = action.namespace
new_actions = self.make_group_from_elts('action', actions.action, {}, subpath, False)
family = self.objectspace.family()
family.name = actions.name
family.family = new_actions
family.mode = None
variables = []
for key, value in vars(actions).items():
if key not in ERASED_FAMILY_ACTION_ATTRIBUTES:
variables.append(self._generate_element('action', key, value, 'string',
subpath))
family.variable = variables
self.space.actions.family.append(family)
del self.space.family_action
class SpaceAnnotator(object):
"""Transformations applied on a CreoleObjSpace instance
"""
def __init__(self, objectspace, eosfunc_file):
self.paths = objectspace.paths
self.space = objectspace.space
self.objectspace = objectspace
self.valid_enums = {}
self.force_value = {}
self.has_calc = []
self.force_no_value = []
self.force_not_mandatory = []
if eosfunc_file is not None:
self.eosfunc = imp.load_source('eosfunc', eosfunc_file)
else:
self.eosfunc = None
if HIGH_COMPATIBILITY:
self.default_has_no_value = []
self.has_frozen_if_in_condition = []
self.default_variable_options()
self.convert_auto_freeze()
self.convert_groups()
self.filter_check()
self.filter_condition()
self.convert_valid_enums()
self.convert_check()
self.convert_autofill()
self.remove_empty_families()
self.change_variable_mode()
self.change_family_mode()
self.filter_separators()
self.absolute_path_for_symlink_in_containers()
self.convert_helps()
if hasattr(self.space, 'constraints'):
del self.space.constraints.index
if vars(self.space.constraints):
raise Exception('constraints again?')
del self.space.constraints
def absolute_path_for_symlink_in_containers(self):
if not hasattr(self.space, 'containers'):
return
families = vars(self.space.containers).values()
for family in families:
if hasattr(family, 'family'):
for fam in family.family.values():
for fam1 in fam.family:
for variable in fam1.variable:
if variable.type == 'symlink' and '.' not in variable.name:
variable.opt = self.paths.get_variable_path(variable.opt, 'creole')
def convert_helps(self):
# FIXME l'aide doit etre dans la variable!
if not hasattr(self.space, 'help'):
return
helps = self.space.help
if hasattr(helps, 'variable'):
for hlp in helps.variable.values():
variable = self.paths.get_variable_obj(hlp.name)
variable.help = hlp.text
if hasattr(helps, 'family'):
for hlp in helps.family.values():
variable = self.paths.get_family_obj(hlp.name)
variable.help = hlp.text
del self.space.help
def convert_groups(self): # pylint: disable=C0111
if hasattr(self.space, 'constraints'):
if hasattr(self.space.constraints, 'group'):
for group in self.space.constraints.group:
leader_fullname = group.master
follower_names = list(group.slave.keys())
leader_family_name = self.paths.get_variable_family_name(leader_fullname)
namespace = self.paths.get_variable_namespace(leader_fullname)
leader_name = self.paths.get_variable_name(leader_fullname)
leader_family = self.space.variables[namespace].family[leader_family_name]
leader_path = namespace + '.' + leader_family_name
is_leader = False
for variable in list(leader_family.variable.values()):
if isinstance(variable, self.objectspace.Leadership):
# append follower to an existed leadership
if variable.name == leader_name:
leader_space = variable
is_leader = True
else:
if is_leader:
if variable.name == follower_names[0]:
# followers are multi
if not variable.multi is True:
raise CreoleDictConsistencyError(_('the variable {} in a group must be multi').format(variable.name))
follower_names.remove(variable.name)
leader_family.variable.pop(variable.name)
leader_space.variable.append(variable) # pylint: disable=E1101
if namespace == 'creole':
variable_fullpath = variable.name
else:
variable_fullpath = leader_path + '.' + variable.name
self.paths.set_leader(variable_fullpath, leader_name)
if follower_names == []:
break
else:
raise CreoleDictConsistencyError(_('cannot found this follower {}').format(follower_names[0]))
if is_leader is False and variable.name == leader_name:
leader_space = self.objectspace.Leadership()
leader_space.variable = []
leader_space.name = leader_name
leader_space.hidden = variable.hidden
variable.hidden = None
self.paths.append('family', leader_path + '.' + leader_name, namespace, creoleobj=leader_space)
# manage leader's variable
if variable.multi is not True:
raise CreoleDictConsistencyError(_('the variable {} in a group must be multi').format(variable.name))
leader_family.variable[leader_name] = leader_space
leader_space.variable.append(variable) # pylint: disable=E1101
self.paths.set_leader(leader_fullname, leader_name)
leader_space.path = leader_fullname
is_leader = True
else:
raise CreoleDictConsistencyError(_('cannot found followers {}').format(follower_names))
del self.space.constraints.group
def remove_empty_families(self): # pylint: disable=C0111,R0201
if hasattr(self.space, 'variables'):
for family in self.space.variables.values():
if hasattr(family, 'family'):
space = family.family
removed_families = []
for family_name, family in space.items():
if not hasattr(family, 'variable') or len(family.variable) == 0:
removed_families.append(family_name)
del space[family_name]
# remove help too
if hasattr(self.space, 'help') and hasattr(self.space.help, 'family'):
for family in self.space.help.family.keys():
if family in removed_families:
del self.space.help.family[family]
def change_family_mode(self): # pylint: disable=C0111
if not hasattr(self.space, 'variables'):
return
for family in self.space.variables.values():
if hasattr(family, 'family'):
for family in family.family.values():
mode = modes_level[-1]
for variable in family.variable.values():
if isinstance(variable, self.objectspace.Leadership):
variable_mode = variable.variable[0].mode
variable.variable[0].mode = None
variable.mode = variable_mode
else:
variable_mode = variable.mode
if variable_mode is not None and modes[mode] > modes[variable_mode]:
mode = variable_mode
if family.name == 'Containers':
family.mode = 'normal'
else:
family.mode = mode
def _annotate_variable(self, variable, family_mode, path, is_follower=False):
if (HIGH_COMPATIBILITY and variable.type == 'choice' and variable.mode != modes_level[-1] and variable.mandatory is True and path in self.default_has_no_value):
variable.mode = modes_level[0]
if variable.type == 'choice' and is_follower and family_mode == modes_level[0] and variable.mandatory is True:
variable.mode = modes_level[0]
# if the variable is mandatory and doesn't have any value
# then the variable's mode is set to 'basic'
has_value = hasattr(variable, 'value')
if (path not in self.has_calc and variable.mandatory is True and
(not has_value or is_follower) and variable.type != 'choice'):
variable.mode = modes_level[0]
if has_value:
if not HIGH_COMPATIBILITY or (not path.startswith('creole.containers.') \
and path not in self.force_no_value and path not in self.force_not_mandatory):
variable.mandatory = True
if variable.hidden is True:
variable.frozen = True
if not variable.auto_save is True and 'force_default_on_freeze' not in vars(variable):
variable.force_default_on_freeze = True
if variable.name == 'frozen' and not variable.auto_save is True:
variable.force_default_on_freeze = True
if variable.mode != None and not is_follower and modes[variable.mode] < modes[family_mode]:
variable.mode = family_mode
if variable.mode != None and variable.mode != modes_level[0] and modes[variable.mode] < modes[family_mode]:
variable.mode = family_mode
if variable.name == "available_probes":
variable.force_default_on_freeze = False
def default_variable_options(self):
if hasattr(self.space, 'variables'):
for families in self.space.variables.values():
if hasattr(families, 'family'):
for family in families.family.values():
if hasattr(family, 'variable'):
for variable in family.variable.values():
if not hasattr(variable, 'type'):
variable.type = 'string'
if variable.type != 'symlink' and not hasattr(variable, 'description'):
variable.description = variable.name
def convert_auto_freeze(self): # pylint: disable=C0111
if hasattr(self.space, 'variables'):
for variables in self.space.variables.values():
if hasattr(variables, 'family'):
for family in variables.family.values():
if hasattr(family, 'variable'):
for variable in family.variable.values():
if variable.auto_freeze:
new_condition = self.objectspace.condition()
new_condition.name = 'auto_frozen_if_in'
new_condition.namespace = variables.name
new_condition.source = FREEZE_AUTOFREEZE_VARIABLE
new_param = self.objectspace.param()
new_param.text = 'oui'
new_condition.param = [new_param]
new_target = self.objectspace.target()
new_target.type = 'variable'
if variables.name == 'creole':
path = variable.name
else:
path = variable.namespace + '.' + family.name + '.' + variable.name
new_target.name = path
new_condition.target = [new_target]
if not hasattr(self.space.constraints, 'condition'):
self.space.constraints.condition = []
self.space.constraints.condition.append(new_condition)
def _set_valid_enum(self, variable, values, type_):
if isinstance(values, list):
variable.mandatory = True
variable.choice = []
choices = []
for value in values:
choice = self.objectspace.choice()
choice.name = str(value)
choices.append(choice.name)
choice.type = type_
variable.choice.append(choice)
if not variable.choice:
raise CreoleDictConsistencyError(_('empty valid enum is not allowed for variable {}').format(variable.name))
if hasattr(variable, 'value'):
for value in variable.value:
value.type = type_
if value.name not in choices:
raise CreoleDictConsistencyError(_('value "{}" of variable "{}" is not in list of all expected values ({})').format(value.name, variable.name, choices))
else:
new_value = self.objectspace.value()
new_value.name = values[0]
new_value.type = type_
variable.value = [new_value]
else:
# probe choice
variable.choice = values
variable.type = 'choice'
def _convert_valid_enum(self, variable, path):
if variable.type in FORCE_CHOICE:
if path in self.valid_enums:
raise CreoleDictConsistencyError(_('cannot set valid enum for variable with type {}').format(variable.type))
self._set_valid_enum(variable, FORCE_CHOICE[variable.type], 'string')
if path in self.valid_enums:
values = self.valid_enums[path]['values']
self._set_valid_enum(variable, values, variable.type)
del self.valid_enums[path]
if path in self.force_value:
new_value = self.objectspace.value()
new_value.name = self.force_value[path]
variable.value = [new_value]
del self.force_value[path]
def convert_valid_enums(self): # pylint: disable=C0111
if not hasattr(self.space, 'variables'):
return
for variables in self.space.variables.values():
namespace = variables.name
if hasattr(variables, 'family'):
for family in variables.family.values():
if hasattr(family, 'variable'):
for variable in family.variable.values():
if isinstance(variable, self.objectspace.Leadership):
for follower in variable.variable:
path = '{}.{}.{}.{}'.format(namespace, family.name, variable.name, follower.name)
self._convert_valid_enum(follower, path)
else:
path = '{}.{}.{}'.format(namespace, family.name, variable.name)
self._convert_valid_enum(variable, path)
# valid_enums must be empty now (all information are store in objects)
if self.valid_enums:
raise CreoleDictConsistencyError(_('valid_enum sets for unknown variables {}').format(self.valid_enums.keys()))
def change_variable_mode(self): # pylint: disable=C0111
if not hasattr(self.space, 'variables'):
return
for variables in self.space.variables.values():
if hasattr(variables, 'family'):
for family in variables.family.values():
family_mode = family.mode
if hasattr(family, 'variable'):
for variable in family.variable.values():
if isinstance(variable, self.objectspace.Leadership):
mode = modes_level[-1]
for follower in variable.variable:
if follower.auto_save is True:
raise CreoleDictConsistencyError(_('leader/followers {} '
'could not be '
'auto_save').format(follower.name))
if follower.auto_freeze is True:
raise CreoleDictConsistencyError(_('leader/followers {} '
'could not be '
'auto_freeze').format(follower.name))
if HIGH_COMPATIBILITY and variable.name != follower.name: # and variable.variable[0].mode != modes_level[0]:
is_follower = True
else:
is_follower = False
path = '{}.{}.{}'.format(family.path, variable.name, follower.name)
self._annotate_variable(follower, family_mode, path, is_follower)
if HIGH_COMPATIBILITY:
# leader's variable are right
if modes[variable.variable[0].mode] > modes[follower.mode]:
follower.mode = variable.variable[0].mode
else:
# auto_save's variable is set in 'basic' mode if its mode is 'normal'
if follower.auto_save is True and follower.mode != modes_level[-1]:
follower.mode = modes_level[0]
if modes[mode] > modes[follower.mode]:
mode = follower.mode
if not HIGH_COMPATIBILITY:
# the leader's mode is the lowest
variable.variable[0].mode = mode
variable.mode = variable.variable[0].mode
else:
# auto_save's variable is set in 'basic' mode if its mode is 'normal'
if variable.auto_save is True and variable.mode != modes_level[-1]:
variable.mode = modes_level[0]
# auto_freeze's variable is set in 'basic' mode if its mode is 'normal'
if variable.auto_freeze is True and variable.mode != modes_level[-1]:
variable.mode = modes_level[0]
path = '{}.{}'.format(family.path, variable.name)
self._annotate_variable(variable, family_mode, path)
def get_variable(self, name): # pylint: disable=C0111
return self.paths.get_variable_obj(name)
def convert_autofill(self): # pylint: disable=C0111
if hasattr(self.space, 'constraints'):
self.convert_duplicate_autofill(self.space.constraints)
if 'auto' in vars(self.space.constraints):
self.convert_auto(self.space.constraints.auto, self.space)
if 'fill' in vars(self.space.constraints):
self.convert_fill(self.space.constraints.fill, self.space)
def convert_duplicate_autofill(self, constraints):
""" Remove duplicate auto or fill for a variable
This variable must be redefined
"""
fills = {}
# sort fill/auto by index
if 'fill' in vars(constraints):
for idx, fill in enumerate(constraints.fill):
if fill.index in fills:
raise Exception('hu?')
fills[fill.index] = {'idx': idx, 'fill': fill, 'type': 'fill'}
if 'auto' in vars(constraints):
for idx, fill in enumerate(constraints.auto):
if fill.index in fills:
raise Exception('hu?')
fills[fill.index] = {'idx': idx, 'fill': fill, 'type': 'auto'}
indexes = list(fills.keys())
indexes.sort()
targets = {}
remove_autos = []
remove_fills = []
for idx in indexes:
fill = fills[idx]['fill']
if hasattr(fill, 'redefine'):
redefine = bool(fill.redefine)
else:
redefine = False
if fill.target in targets:
if redefine:
if targets[fill.target][1] == 'auto':
remove_autos.append(targets[fill.target][0])
else:
remove_fills.append(targets[fill.target][0])
else:
raise CreoleDictConsistencyError(_("An auto or fill already exists "
"for the target: {}").format(
fill.target))
targets[fill.target] = (fills[idx]['idx'], fills[idx]['type'])
remove_autos.sort(reverse=True)
for idx in remove_autos:
constraints.auto.pop(idx)
remove_fills.sort(reverse=True)
for idx in remove_fills:
constraints.fill.pop(idx)
def convert_auto(self, auto_space, space): # pylint: disable=C0111
for auto in auto_space:
if HIGH_COMPATIBILITY and auto.target in self.has_frozen_if_in_condition:
# if a variable has a 'frozen_if_in' condition
# then we change the 'auto' variable as a 'fill' variable
continue
# an auto is a fill with "hidden" and "frozen" properties
variable = self.get_variable(auto.target)
if variable.auto_freeze:
raise CreoleDictConsistencyError(_('variable with auto value '
'cannot be auto_freeze').format(auto.target))
if variable.auto_save:
raise CreoleDictConsistencyError(_('variable with auto value '
'cannot be auto_save').format(auto.target))
leader = self.paths.get_leader(auto.target)
if leader is None or variable.name != leader:
variable.hidden = True
else:
leadership = self.paths.get_family_obj(self.paths.get_variable_family_path(auto.target))
leadership.hidden = True
variable.frozen = True
variable.force_default_on_freeze = True
if 'fill' not in vars(space.constraints):
space.constraints.fill = []
space.constraints.fill.extend(auto_space)
del space.constraints.auto
def filter_separators(self): # pylint: disable=C0111,R0201
# FIXME devrait etre dans la variable
if not hasattr(self.space, 'variables'):
return
for family in self.space.variables.values():
if (hasattr(family, 'separators') and hasattr(family.separators, 'separator')):
space = family.separators.separator
names = []
for idx, separator in enumerate(space):
namespace = self.paths.get_variable_namespace(separator.name)
subpath = self.paths.get_variable_path(separator.name, namespace)
separator.name = subpath
if separator.name in names:
raise CreoleDictConsistencyError(_('{} already has a separator').format(separator.name))
names.append(separator.name)
def load_params_in_validenum(self, param):
if param.type in ['string', 'python', 'number']:
if not hasattr(param, 'text') and (param.type == 'python' or param.type == 'number'):
raise CreoleDictConsistencyError(_("All '{}' variables shall be set in order to calculate {}").format(param.type, 'valid_enum'))
if param.type in ['string', 'number']:
try:
values = literal_eval(param.text)
except ValueError:
raise CreoleDictConsistencyError(_('Cannot load {}').format(param.text))
elif param.type == 'python':
try:
values = eval(param.text, {'eosfunc': self.eosfunc, '__builtins__': {'range': range, 'str': str}})
#FIXME : eval('[str(i) for i in range(3, 13)]', {'eosfunc': eosfunc, '__builtins__': {'range': range, 'str': str}})
except NameError:
raise CreoleDictConsistencyError(_('The function {} is unknown').format(param.text))
if not isinstance(values, list):
raise CreoleDictConsistencyError(_('Function {} shall return a list').format(param.text))
new_values = []
for val in values:
new_values.append(val)
values = new_values
else:
values = param.text
return values
def filter_check(self): # pylint: disable=C0111
# valid param in check
if not hasattr(self.space, 'constraints') or not hasattr(self.space.constraints, 'check'):
return
space = self.space.constraints.check
remove_indexes = []
for check_idx, check in enumerate(space):
namespace = check.namespace
if hasattr(check, 'param'):
param_option_indexes = []
for idx, param in enumerate(check.param):
if param.type not in TYPE_PARAM_CHECK:
raise CreoleDictConsistencyError(_('cannot use {} type as a param in check for {}').format(param.type, check.target))
if param.type == 'eole':
if HIGH_COMPATIBILITY and param.text.startswith('container_ip'):
if param.optional is True:
param_option_indexes.append(idx)
try:
param.text = self.paths.get_variable_path(param.text, namespace)
except CreoleDictConsistencyError as err:
if param.optional is True:
param_option_indexes.append(idx)
else:
raise err
param_option_indexes = list(set(param_option_indexes))
param_option_indexes.sort(reverse=True)
for idx in param_option_indexes:
check.param.pop(idx)
if not HIGH_COMPATIBILITY and check.param == []:
remove_indexes.append(check_idx)
remove_indexes.sort(reverse=True)
for idx in remove_indexes:
del space[idx]
variables = {}
for index, check in enumerate(space):
namespace = check.namespace
if HIGH_COMPATIBILITY:
if not self.paths.path_is_defined(check.target):
continue
check.is_in_leadership = self.paths.get_leader(check.target) != None
# let's replace the target by the path
check.target = self.paths.get_variable_path(check.target, namespace)
if check.target not in variables:
variables[check.target] = []
variables[check.target].append((index, check))
# remove check already set for a variable
remove_indexes = []
for checks in variables.values():
names = {}
for idx, check in checks:
if HIGH_COMPATIBILITY and check.name == 'valid_enum':
redefine = True
else:
redefine = False
#redefine = bool(check.redefine)
if redefine and check.name in names:
remove_indexes.append(names[check.name])
del names[check.name]
names[check.name] = idx
del check.index
remove_indexes.sort(reverse=True)
for idx in remove_indexes:
del space[idx]
remove_indexes = []
for idx, check in enumerate(space):
if not check.name in dir(self.eosfunc):
raise CreoleDictConsistencyError(_('cannot find check function {}').format(check.name))
#is_probe = not check.name in self.eosfunc.func_on_zephir_context
#if is_probe:
# raise CreoleDictConsistencyError(_('cannot have a check with probe function ({})').format(check.name))
if check.name == 'valid_enum':
proposed_value_type = False
remove_params = []
for param_idx, param in enumerate(check.param):
if hasattr(param, 'name') and param.name == 'checkval':
try:
proposed_value_type = self.objectspace._convert_boolean(param.text) == False
remove_params.append(param_idx)
except TypeError as err:
raise CreoleDictConsistencyError(_('cannot load checkval value for variable {}: {}').format(check.target, err))
if proposed_value_type:
# no more supported
raise CreoleDictConsistencyError(_('cannot load checkval value for variable {}, no more supported').format(check.target))
remove_params.sort(reverse=True)
for param_idx in remove_params:
del check.param[param_idx]
if len(check.param) != 1:
raise CreoleDictConsistencyError(_('cannot set more than one param '
'for valid_enum for variable {}'
'').format(check.target))
param = check.param[0]
if proposed_value_type:
if param.type != 'eole':
try:
values = self.load_params_in_validenum(param)
except NameError as err:
raise CreoleDictConsistencyError(_('cannot load value for variable {}: {}').format(check.target, err))
add_value = True
if HIGH_COMPATIBILITY and check.is_in_leadership:
add_value = False
if add_value and values:
self.force_value[check.target] = values[0]
else:
if check.target in self.valid_enums:
raise CreoleDictConsistencyError(_('valid_enum already set for {}'
'').format(check.target))
values = self.load_params_in_validenum(param)
self.valid_enums[check.target] = {'type': param.type,
'values': values}
remove_indexes.append(idx)
remove_indexes.sort(reverse=True)
for idx in remove_indexes:
del space[idx]
#convert level to "warnings_only" and hidden to "transitive"
for check in space:
if check.level == 'warning':
check.warnings_only = True
else:
check.warnings_only = False
check.level = None
if hasattr(check, 'param'):
for param in check.param:
if not param.hidden is True:
check.transitive = False
param.hidden = None
if not self.space.constraints.check:
del self.space.constraints.check
def convert_check(self):
if not hasattr(self.space, 'constraints') or not hasattr(self.space.constraints, 'check'):
return
for check in self.space.constraints.check:
variable = self.paths.get_variable_obj(check.target)
check_ = self.objectspace.check()
name = check.name
if name == 'valid_differ':
name = 'valid_not_equal'
elif name == 'valid_network_netmask':
params_len = 1
if len(check.param) != params_len:
raise CreoleLoaderError(_('{} must have {} param').format(name, params_len))
elif name == 'valid_ipnetmask':
params_len = 1
if len(check.param) != params_len:
raise CreoleLoaderError(_('{} must have {} param').format(name, params_len))
name = 'valid_ip_netmask'
elif name == 'valid_broadcast':
params_len = 2
if len(check.param) != params_len:
raise CreoleLoaderError(_('{} must have {} param').format(name, params_len))
elif name == 'valid_in_network':
params_len = 2
if len(check.param) != params_len:
raise CreoleLoaderError(_('{} must have {} param').format(name, params_len))
check_.name = name
check_.warnings_only = check.warnings_only
if hasattr(check, 'param'):
check_.param = check.param
if not hasattr(variable, 'check'):
variable.check = []
variable.check.append(check_)
del self.space.constraints.check
def convert_fill(self, fill_space, space): # pylint: disable=C0111,R0912
fills = {}
# sort fill/auto by index
for idx, fill in enumerate(fill_space):
fills[fill.index] = {'idx': idx, 'fill': fill}
del fill.index
indexes = list(fills.keys())
indexes.sort()
del_idx = []
for idx in indexes:
fill = fills[idx]['fill']
variable = self.get_variable(fill.target)
if hasattr(variable, 'value'):
del variable.value
namespace = fill.namespace
# let's replace the target by the path
fill.target = self.paths.get_variable_path(fill.target, namespace)
if not fill.name in dir(self.eosfunc):
raise CreoleDictConsistencyError(_('cannot find fill function {}').format(fill.name))
#is_probe = not fill.name in self.eosfunc.func_on_zephir_context
if hasattr(fill, 'param'):
for param in fill.param:
if param.type not in TYPE_PARAM_FILL:
raise CreoleDictConsistencyError(_('cannot use {} type as a param '
'in a fill/auto').format(param.type))
param_option_indexes = []
for fill_idx, param in enumerate(fill.param):
if not hasattr(param, 'text') and \
(param.type == 'eole' or param.type == 'number' or \
#param.type == 'container' or param.type == 'python'):
param.type == 'python'):
raise CreoleDictConsistencyError(_("All '{}' variables shall be set in "
"order to calculate {}").format(
param.type,
fill.target))
# if param.type == 'container':
# param.type = 'eole'
# param.text = 'container_ip_{}'.format(param.text)
if param.type == 'eole':
#if is_probe:
# raise CreoleDictConsistencyError(_('Function {0} used to calculate {1} '
# 'is executed on remote server, '
# 'so cannot depends to an '
# 'other variable'
# ).format(fill.name, fill.target))
if HIGH_COMPATIBILITY and param.text.startswith('container_ip'):
if param.optional is True:
param_option_indexes.append(fill_idx)
try:
param.text = self.paths.get_variable_path(param.text, namespace)
except CreoleDictConsistencyError as err:
if param.optional is True:
param_option_indexes.append(fill_idx)
else:
raise err
param_option_indexes = list(set(param_option_indexes))
param_option_indexes.sort(reverse=True)
for param_idx in param_option_indexes:
fill.param.pop(param_idx)
self.has_calc.append(fill.target)
#if is_probe:
# variable.force_default_on_freeze = False
# self.objectspace.probe_variables.append(fill)
# del_idx.append(fills[idx]['idx'])
del_idx.sort(reverse=True)
for idx in del_idx:
space.constraints.fill.pop(idx)
for fill in space.constraints.fill:
variable = self.paths.get_variable_obj(fill.target)
value = self.objectspace.value()
value.type = 'calculation'
value.name = fill.name
if hasattr(fill, 'param'):
for param in fill.param:
if param.hidden is True:
param.transitive = False
param.hidden = None
value.param = fill.param
if not hasattr(variable, 'value'):
variable.value = []
variable.value.append(value)
self.force_not_mandatory.append(fill.target)
del space.constraints.fill
def filter_targets(self): # pylint: disable=C0111
for condition_idx, condition in enumerate(self.space.constraints.condition):
namespace = condition.namespace
del_idx = []
for idx, target in enumerate(condition.target):
if target.type == 'variable':
if (hasattr(target, 'optional') and target.optional is True and
not self.paths.path_is_defined(target.name)):
del_idx.append(idx)
continue
if condition.source == target.name:
raise CreoleDictConsistencyError(_('target name and source name must be different: {}').format(condition.source))
target.name = self.paths.get_variable_path(target.name, namespace)
elif target.type == 'family':
try:
target.name = self.paths.get_family_path(target.name, namespace)
except KeyError:
raise CreoleDictConsistencyError(_('cannot found family {}').format(target.name))
del_idx = list(set(del_idx))
del_idx.sort(reverse=True)
for idx in del_idx:
condition.target.pop(idx)
def filter_condition_servicelist(self):
# automatic generation of the service_access lists
# and the service_restriction lists from the servicelist
for condition in self.space.constraints.condition:
if hasattr(condition, 'target'):
new_targets = []
for target in condition.target:
if target.type == 'servicelist':
new_target = copy(target)
new_target.type = 'service_accesslist'
new_target.name = '___auto_{}'.format(new_target.name)
new_targets.append(new_target)
new_target = copy(target)
new_target.type = 'service_restrictionlist'
new_target.name = '___auto_{}'.format(new_target.name)
new_targets.append(new_target)
condition.target.extend(new_targets)
def check_condition_without_target(self):
for condition in self.space.constraints.condition:
if not hasattr(condition, 'target'):
raise CreoleDictConsistencyError(_('target is mandatory in condition'))
def check_condition_fallback_not_exists(self, fallback_variables, fallback_lists):
# a condition with a fallback **and** the source variable doesn't exist
remove_conditions = []
for idx, condition in enumerate(self.space.constraints.condition):
if (hasattr(condition, 'fallback') and condition.fallback is True and
not self.paths.path_is_defined(condition.source)):
for target in condition.target:
if target.type in ['variable', 'family']:
name = target.name.split('.')[-1]
if target.type == 'variable':
variable = self.get_variable(name)
else:
variable = self.paths.get_family_obj(name)
if condition.name in ['disabled_if_in']:
variable.disabled = True
if condition.name in ['mandatory_if_in']:
variable.mandatory = True
if condition.name in ['disabled_if_in', 'disabled_if_not_in',
'frozen_if_in', 'frozen_if_not_in']:
variable.hidden = False
if HIGH_COMPATIBILITY:
fallback_variables.append(name)
else:
listname = target.type
if not listname.endswith('list'):
raise Exception('not yet implemented')
listvars = self.objectspace.list_conditions.get(listname,
{}).get(target.name)
if listvars:
for listvar in listvars:
try:
variable = self.get_variable(listvar)
except CreoleDictConsistencyError:
variable = self.paths.get_family_obj(listvar)
if condition.name in ['disabled_if_in']:
variable.disabled = True
if condition.name in ['mandatory_if_in']:
variable.mandatory = True
if condition.name in ['disabled_if_in', 'disabled_if_not_in',
'frozen_if_in', 'frozen_if_not_in']:
variable.hidden = False
fallback_lists.append(listvar)
remove_conditions.append(idx)
remove_conditions = list(set(remove_conditions))
remove_conditions.sort(reverse=True)
for idx in remove_conditions:
self.space.constraints.condition.pop(idx)
def convert_xxxlist_to_variable(self, fallback_lists): # pylint: disable=C0111
# transform *list to variable or family
for condition_idx, condition in enumerate(self.space.constraints.condition):
new_targets = []
remove_targets = []
for target_idx, target in enumerate(condition.target):
if target.type not in ['variable', 'family']:
listname = target.type
if not listname.endswith('list'):
raise Exception('not yet implemented')
listvars = self.objectspace.list_conditions.get(listname,
{}).get(target.name)
if listvars:
for listvar in listvars:
if listvar in fallback_lists:
continue
try:
variable = self.get_variable(listvar)
type_ = 'variable'
except CreoleDictConsistencyError:
variable = self.paths.get_family_obj(listvar)
type_ = 'family'
new_target = self.objectspace.target()
new_target.type = type_
new_target.name = listvar
new_target.index = target.index
new_targets.append(new_target)
remove_targets.append(target_idx)
remove_targets = list(set(remove_targets))
remove_targets.sort(reverse=True)
for target_idx in remove_targets:
condition.target.pop(target_idx)
condition.target.extend(new_targets)
def check_condition(self):
# if condition.name == 'hidden_if_in':
# condition.name = 'disabled_if_in'
# elif condition.name == 'hidden_if_not_in':
# condition.name = 'disabled_if_not_in'
for condition in self.space.constraints.condition:
if condition.name not in ['disabled_if_in', 'disabled_if_not_in', 'frozen_if_in', 'auto_frozen_if_in',
'frozen_if_not_in', 'mandatory_if_in', 'mandatory_if_not_in']:
raise CreoleDictConsistencyError(_('unknown condition {}').format(condition.name))
def check_params(self):
for condition in self.space.constraints.condition:
for param in condition.param:
if param.type not in TYPE_PARAM_CONDITION:
raise CreoleDictConsistencyError(_('cannot use {} type as a param '
'in a condition').format(param.type))
def check_choice_option_condition(self, force_remove_targets):
# remove condition for ChoiceOption that don't have param
remove_conditions = []
for condition_idx, condition in enumerate(self.space.constraints.condition):
namespace = condition.namespace
src_variable = self.paths.get_variable_obj(condition.source)
condition.source = self.paths.get_variable_path(condition.source, namespace, allow_source=True)
valid_enum = None
if condition.source in self.valid_enums and \
self.valid_enums[condition.source]['type'] == 'string':
valid_enum = self.valid_enums[condition.source]['values']
if src_variable.type in FORCE_CHOICE:
valid_enum = FORCE_CHOICE[src_variable.type]
if valid_enum is not None:
remove_param = []
for param_idx, param in enumerate(condition.param):
if param.text not in valid_enum:
remove_param.append(param_idx)
remove_param.sort(reverse=True)
for idx in remove_param:
del condition.param[idx]
if condition.param == []:
for target in condition.target:
if target.name.startswith('creole.'):
name = target.name.split('.')[-1]
else:
name = target.name
if target.type == 'variable':
variable = self.get_variable(name)
else:
variable = self.paths.get_family_obj(name)
if condition.name == 'disabled_if_not_in':
variable.disabled = True
force_remove_targets.setdefault(condition.name,
[]).append(target.name)
elif condition.name == 'frozen_if_not_in':
variable.hidden = True
force_remove_targets.setdefault(condition.name,
[]).append(target.name)
elif condition.name == 'mandatory_if_not_in':
variable.mandatory = True
force_remove_targets.setdefault(condition.name,
[]).append(target.name)
elif HIGH_COMPATIBILITY and condition.name == 'disabled_if_in':
variable.hidden = False
remove_conditions.append(condition_idx)
remove_conditions = list(set(remove_conditions))
remove_conditions.sort(reverse=True)
for idx in remove_conditions:
self.space.constraints.condition.pop(idx)
def manage_variable_property(self, force_remove_targets, fallback_variables):
for condition in self.space.constraints.condition:
remove_targets = []
#parse each variable and family
for target_idx, target in enumerate(condition.target):
if target.name in force_remove_targets.get(condition.name, []):
remove_targets.append(target_idx)
if target.name.startswith('creole.'):
name = target.name.split('.')[-1]
else:
name = target.name
if target.type == 'variable':
variable = self.get_variable(name)
else:
variable = self.paths.get_family_obj(name)
if name in fallback_variables:
remove_targets.append(target_idx)
continue
if condition.name in ['disabled_if_in', 'disabled_if_not_in',
'frozen_if_in', 'frozen_if_not_in']:
variable.hidden = False
if condition.name in ['mandatory_if_in', 'mandatory_if_not_in']:
variable.mandatory = False
if HIGH_COMPATIBILITY and condition.name in ['frozen_if_in',
'frozen_if_not_in']:
self.has_frozen_if_in_condition.append(name)
if condition.name in ['mandatory_if_in', 'mandatory_if_not_in']:
self.force_not_mandatory.append(target.name)
remove_targets = list(set(remove_targets))
remove_targets.sort(reverse=True)
for target_idx in remove_targets:
condition.target.pop(target_idx)
def remove_condition_with_empty_target(self):
remove_conditions = []
for condition_idx, condition in enumerate(self.space.constraints.condition):
if not condition.target:
remove_conditions.append(condition_idx)
remove_conditions = list(set(remove_conditions))
remove_conditions.sort(reverse=True)
for idx in remove_conditions:
self.space.constraints.condition.pop(idx)
def filter_condition(self): # pylint: disable=C0111
if not hasattr(self.space, 'constraints') or not hasattr(self.space.constraints, 'condition'):
return
fallback_variables = []
fallback_lists = []
force_remove_targets = {}
self.check_condition()
self.check_params()
self.check_condition_without_target()
self.filter_condition_servicelist()
self.check_condition_fallback_not_exists(fallback_variables, fallback_lists)
self.filter_targets()
self.convert_xxxlist_to_variable(fallback_lists)
self.check_choice_option_condition(force_remove_targets)
self.manage_variable_property(force_remove_targets, fallback_variables)
self.remove_condition_with_empty_target()
for condition in self.space.constraints.condition:
if condition.name == 'disabled_if_in':
actions = ['disabled']
inverse = False
elif condition.name == 'disabled_if_not_in':
actions = ['disabled']
inverse = True
elif condition.name == 'frozen_if_in':
actions = ['frozen', 'hidden', 'force_default_on_freeze']
inverse = False
elif condition.name == 'frozen_if_not_in':
actions = ['frozen', 'hidden', 'force_default_on_freeze']
inverse = True
elif condition.name == 'mandatory_if_in':
actions = ['mandatory']
inverse = False
elif condition.name == 'mandatory_if_not_in':
actions = ['mandatory']
inverse = True
elif condition.name == 'auto_frozen_if_in':
actions = ['auto_frozen']
inverse = True
for param in condition.param:
if hasattr(param, 'text'):
param = param.text
else:
param = None
for target in condition.target:
if target.name.startswith('creole.'):
name = target.name.split('.')[-1]
else:
name = target.name
if target.type == 'variable':
variable = self.get_variable(name)
else:
variable = self.paths.get_family_obj(name)
if not hasattr(variable, 'property'):
variable.property = []
for action in actions:
prop = self.objectspace.property_()
prop.type = 'calculation'
prop.inverse = inverse
prop.source = condition.source
prop.expected = param
prop.name = action
variable.property.append(prop)
del self.space.constraints.condition

25
src/rougail/config.py Normal file
View File

@ -0,0 +1,25 @@
# -*- coding: utf-8 -*-
"""
fichier de configuration pour créole
"""
from os.path import join, isfile
eoledir = '/usr/share/eole'
eoleroot = join(eoledir, 'creole')
# chemin du répertoire source des fichiers templates
templatedir = '/var/lib/creole'
distrib_dir = join(eoleroot, 'distrib')
patch_dir = join(eoleroot, 'patch')
# repertoire de la dtd
dtddir = '/usr/share/creole'
if isfile('data/creole.dtd'):
dtdfilename = 'data/creole.dtd'
elif isfile('../creole/data/creole.dtd'):
dtdfilename = '../creole/data/creole.dtd'
else:
dtdfilename = join(dtddir, 'creole.dtd')

36
src/rougail/error.py Normal file
View File

@ -0,0 +1,36 @@
# -*- coding: utf-8 -*-
"""
Erreurs Creole
"""
class ConfigError(Exception):
pass
class FileNotFound(ConfigError):
pass
class TemplateError(ConfigError):
pass
class TemplateDisabled(TemplateError):
"""Template is disabled.
"""
pass
class CreoleOperationError(Exception):
"""Type error or value Error for Creole variable's type or values
"""
class SpaceObjShallNotBeUpdated(Exception):
"""Specific behavior in case of the presence or not
of an object in the space object
"""
class CreoleDictConsistencyError(Exception):
"""It's not only that the Creole XML is valid against the Creole DTD
it's that it is not consistent.
"""

52
src/rougail/i18n.py Normal file
View File

@ -0,0 +1,52 @@
# -*- coding: UTF-8 -*-
# Copyright (C) 2012-2013 Team tiramisu (see AUTHORS for all contributors)
#
# This program is free software: you can redistribute it and/or modify it
# under the terms of the GNU Lesser General Public License as published by the
# Free Software Foundation, either version 3 of the License, or (at your
# option) any later version.
#
# This program is distributed in the hope that it will be useful, but WITHOUT
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
# FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
# details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
# The original `Config` design model is unproudly borrowed from
# the rough gus of pypy: pypy: http://codespeak.net/svn/pypy/dist/pypy/config/
# the whole pypy projet is under MIT licence
"internationalisation utilities"
import gettext
import os
import sys
import locale
# Application Name
APP_NAME = 'creole'
# Traduction dir
APP_DIR = os.path.join(sys.prefix, 'share')
LOCALE_DIR = os.path.join(APP_DIR, 'locale')
# Default Lanugage
DEFAULT_LANG = os.environ.get('LANG', '').split(':')
DEFAULT_LANG += ['en_US']
languages = []
lc, encoding = locale.getdefaultlocale()
if lc:
languages = [lc]
languages += DEFAULT_LANG
mo_location = LOCALE_DIR
gettext.find(APP_NAME, mo_location)
gettext.textdomain(APP_NAME)
gettext.bind_textdomain_codeset(APP_NAME, "UTF-8")
gettext.translation(APP_NAME, fallback=True)
t = gettext.translation(APP_NAME, fallback=True)
_ = t.gettext

554
src/rougail/loader.py Normal file
View File

@ -0,0 +1,554 @@
"""creole loader
flattened XML specific
"""
from os.path import join, isfile, isdir
from os import listdir
#from ast import literal_eval
from lxml.etree import parse, DTD
from tiramisu.option import (StrOption, OptionDescription, PortOption,
IntOption, ChoiceOption, BoolOption, SymLinkOption, IPOption,
NetworkOption, NetmaskOption, DomainnameOption, BroadcastOption,
URLOption, EmailOption, FilenameOption, UsernameOption, DateOption,
PasswordOption, BoolOption, MACOption, Leadership)
from tiramisu import Config, MetaConfig, MixConfig
from tiramisu.setting import groups
from tiramisu.error import ConfigError
from tiramisu.setting import owners
from tiramisu import Params, ParamOption, ParamValue, ParamContext, Calculation, calc_value
from .config import dtdfilename
from .i18n import _
#For compatibility
from .xmlreflector import HIGH_COMPATIBILITY
#from . import eosfunc
from .objspace import CreoleObjSpace
import imp
class CreoleLoaderError(Exception):
pass
def convert_tiramisu_value(value, obj):
"""
convertit les variables dans le bon type si nécessaire
"""
if value is None:
return value
def _convert_boolean(value):
if isinstance(value, bool):
return value
prop = {'True': True,
'False': False,
'None': None}
if value not in prop:
raise Exception('unknown value {} while trying to cast {} to boolean'.format(value, obj))
return prop[value]
func = {IntOption: int, StrOption: str, PortOption: str,
DomainnameOption: str, EmailOption: str, URLOption: str,
IPOption: str, NetmaskOption: str, NetworkOption: str,
BroadcastOption: str, FilenameOption: str,
BoolOption: _convert_boolean}.get(obj, None)
if func is None:
return value
if isinstance(value, list):
return [func(val) for val in value]
else:
return func(value)
CONVERT_OPTION = {'number': dict(opttype=IntOption),
'choice': dict(opttype=ChoiceOption),
'string': dict(opttype=StrOption),
'password': dict(opttype=PasswordOption),
'mail': dict(opttype=EmailOption),
'boolean': dict(opttype=BoolOption),
'symlink': dict(opttype=SymLinkOption),
'filename': dict(opttype=FilenameOption),
'date': dict(opttype=DateOption),
'unix_user': dict(opttype=UsernameOption),
'ip': dict(opttype=IPOption, initkwargs={'allow_reserved': True}),
'local_ip': dict(opttype=IPOption, initkwargs={'private_only': True, 'warnings_only': True}),
'netmask': dict(opttype=NetmaskOption),
'network': dict(opttype=NetworkOption),
'broadcast': dict(opttype=BroadcastOption),
'netbios': dict(opttype=DomainnameOption, initkwargs={'type': 'netbios', 'warnings_only': True}),
'domain': dict(opttype=DomainnameOption, initkwargs={'type': 'domainname', 'allow_ip': True, 'allow_without_dot': True}),
'domain_strict': dict(opttype=DomainnameOption, initkwargs={'type': 'domainname', 'allow_ip': False}),
'hostname': dict(opttype=DomainnameOption, initkwargs={'type': 'hostname', 'allow_ip': True}),
'hostname_strict': dict(opttype=DomainnameOption, initkwargs={'type': 'hostname', 'allow_ip': False}),
'web_address': dict(opttype=URLOption, initkwargs={'allow_ip': True, 'allow_without_dot': True}),
'port': dict(opttype=PortOption, initkwargs={'allow_private': True}),
'mac': dict(opttype=MACOption)
}
class Elt(object):
def __init__(self, attrib):
self.attrib = attrib
class PopulateTiramisuObjects(object):
def __init__(self):
self.storage = ElementStorage()
self.booleans = []
self.force_store_values = set()
self.separators = {}
self.groups = {}
def parse_dtd(self, dtdfilename):
"""Loads the Creole DTD
:raises IOError: if the DTD is not found
:param dtdfilename: the full filename of the Creole DTD
"""
if not isfile(dtdfilename):
raise IOError(_("no such DTD file: {}").format(dtdfilename))
with open(dtdfilename, 'r') as dtdfd:
dtd = DTD(dtdfd)
for elt in dtd.iterelements():
if elt.name == 'variable':
for attr in elt.iterattributes():
if set(attr.itervalues()) == set(['True', 'False']):
self.booleans.append(attr.name)
def make_tiramisu_objects(self, xmlroot, creolefunc_file):
elt = Elt({'name': 'baseoption'})
family = Family(elt, self.booleans, self.storage)
self.storage.add('.', family)
self.eosfunc = imp.load_source('eosfunc', creolefunc_file)
elts = {}
for elt in xmlroot:
elts.setdefault(elt.tag, []).append(elt)
list_elts = list(elts.keys())
if 'family' in list_elts:
list_elts.remove('family')
list_elts.insert(0, 'family')
for elt in list_elts:
xmlelts_ = elts[elt]
if elt == 'family':
xmlelts = []
actions = None
# `creole` family has to be loaded before any other family
# because `extra` family could use `creole` variables.
# `actions` family has to be loaded at the very end
# because it may use `creole` or `extra` variables
for xml in xmlelts_:
if xml.attrib['name'] == 'creole':
xmlelts.insert(0, xml)
elif xml.attrib['name'] == 'actions':
actions = xml
else:
xmlelts.append(xml)
if actions is not None:
xmlelts.append(actions)
else:
xmlelts = xmlelts_
for xmlelt in xmlelts:
if xmlelt.tag != 'family':
raise CreoleLoaderError(_('unknown tag {}').format(xmlelt.tag))
self._iter_family(xmlelt, family)
def _populate_variable(self, elt, subpath, is_follower, is_leader):
variable = Variable(elt, self.booleans, self.storage, is_follower, is_leader, self.eosfunc)
path = self._build_path(subpath, elt)
properties = variable.attrib.get('properties', [])
if 'force_store_value' in properties or "auto_freeze" in properties:
self.force_store_values.add(path)
self.storage.add(path, variable)
return variable
def _populate_family(self, elt, subpath):
if subpath is None:
force_icon = False
else:
force_icon = not subpath.startswith('containers') and not subpath.startswith('actions')
family = Family(elt, self.booleans, self.storage, force_icon)
path = self._build_path(subpath, elt)
self.storage.add(path, family)
return family
def _build_path(self, subpath, elt):
if subpath is None:
subpath = elt.attrib['name']
else:
subpath += '.' + elt.attrib['name']
return subpath
def _iter_leader(self, leader, subpath):
subpath = self._build_path(subpath, leader)
family = Family(leader, self.booleans, self.storage)
family.set_leader()
self.storage.add(subpath, family)
leader_name = None
for var in leader:
if var.tag == 'property':
self._parse_properties(family, var)
elif var.tag == 'variable':
if leader_name is None:
leader_name = var.attrib['name']
self.groups[leader_name] = []
else:
self.groups[leader_name].append(var.attrib['name'])
self._iter_family(var, family, subpath=subpath)
else:
raise CreoleLoaderError(_('unknown tag {}').format(var.tag))
return family
def _iter_family(self, child, family, subpath=None):
if child.tag not in ['family', 'variable', 'separators', 'leader', 'property']:
raise CreoleLoaderError(_('unknown tag {}').format(child.tag))
if child.tag == 'family':
old_family = family
family = self._populate_family(child, subpath)
if old_family is not None:
old_family.add(family)
if len(child) != 0:
subpath = self._build_path(subpath, child)
for c in child:
self._iter_family(c, family, subpath=subpath)
elif child.tag == 'leader':
leader = self._iter_leader(child, subpath)
family.add(leader)
elif child.tag == 'separators':
self._parse_separators(child)
elif child.tag == 'variable':
if family is None:
raise CreoleLoaderError(_('variable without family'))
is_follower = False
is_leader = False
if family.is_leader:
if child.attrib['name'] != family.attrib['name']:
is_follower = True
else:
is_leader = True
variable = self._populate_variable(child, subpath, is_follower, is_leader)
family.add(variable)
elif child.tag == 'property':
self._parse_properties(family, child)
else:
raise Exception('unknown tag {}'.format(child.tag))
def _parse_properties(self, family, child):
if child.get('type') == 'calculation':
kwargs = {'condition': child.attrib['source'],
'expected': ParamValue(child.attrib.get('expected'))}
if child.attrib['inverse'] == 'True':
kwargs['reverse_condition'] = ParamValue(True)
family.attrib['properties'].append((ParamValue(child.text), kwargs))
else:
family.attrib['properties'].append(child.text)
def _parse_separators(self, separators):
for separator in separators:
elt = self.storage.get(separator.attrib['name'])
never_hidden = separator.attrib.get('never_hidden')
if never_hidden == 'True':
never_hidden = True
else:
never_hidden = None
info = (separator.text, never_hidden)
self.separators[separator.attrib['name']] = info
elt.add_information('separator', info)
class ElementStorage:
def __init__(self):
self.paths = {}
def add(self, path, elt):
if path in self.paths:
raise CreoleLoaderError(_('path already loaded {}').format(path))
self.paths[path] = elt
def add_information(self, path, name, information):
elt = self.get(path)
elt.add_information(name, information)
def get(self, path):
if path not in self.paths:
raise CreoleLoaderError(_('there is no element for path {}').format(path))
return self.paths[path]
class Common:
def build_properties(self):
for index, prop in enumerate(self.attrib['properties']):
if isinstance(prop, tuple):
action, kwargs = prop
kwargs['condition'] = ParamOption(self.storage.get(kwargs['condition']).get(), todict=True)
prop = Calculation(calc_value,
Params(action,
kwargs=kwargs))
self.attrib['properties'][index] = prop
if self.attrib['properties']:
self.attrib['properties'] = tuple(self.attrib['properties'])
else:
del self.attrib['properties']
class Variable(Common):
def __init__(self, elt, booleans, storage, is_follower, is_leader, eosfunc):
self.option = None
self.informations = {}
self.attrib = {}
self.attrib['properties'] = []
self.attrib['validators'] = []
self.eosfunc = eosfunc
self.storage = storage
for key, value in elt.attrib.items():
if key in booleans:
if value == 'True':
value = True
elif value == 'False':
value = False
else:
raise CreoleLoaderError(_('unknown value {} for {}').format(value, key))
if key == 'help':
self.add_information(key, value)
elif key == 'type':
pass
else:
self.attrib[key] = value
convert_option = CONVERT_OPTION[elt.attrib['type']]
self.object_type = convert_option['opttype']
if elt.attrib['type'] == 'choice':
if self.attrib.get('choice'):
self.attrib['values'] = getattr(self.eosfunc, self.attrib.get('choice'))
else:
self.attrib['values'] = []
for child in elt:
if child.tag == 'choice':
value = child.text
if 'type' in child.attrib and child.attrib['type'] == 'number':
value = int(value)
if value is None:
value = u''
self.attrib['values'].append(value)
self.attrib['values'] = tuple(self.attrib['values'])
for child in elt:
if child.tag == 'property':
if child.get('type') == 'calculation':
kwargs = {'condition': child.attrib['source'],
'expected': ParamValue(child.attrib.get('expected'))}
if child.attrib['inverse'] == 'True':
kwargs['reverse_condition'] = ParamValue(True)
self.attrib['properties'].append((ParamValue(child.text), kwargs))
else:
self.attrib['properties'].append(child.text)
elif child.tag == 'value':
if child.attrib.get('type') == 'calculation':
if child.text.strip():
self.attrib['default'] = (child.text.strip(),)
else:
params = []
for param in child:
params.append(self.parse_param(param))
self.attrib['default'] = (child.attrib['name'], params, False)
else:
if "type" in child.attrib:
type_ = CONVERT_OPTION[child.attrib['type']]['opttype']
else:
type_ = self.object_type
if self.attrib['multi'] and not is_follower:
if 'default' not in self.attrib:
self.attrib['default'] = []
value = convert_tiramisu_value(child.text, type_)
self.attrib['default'].append(value)
if 'default_multi' not in self.attrib and not is_leader:
self.attrib['default_multi'] = value
else:
if 'default' in self.attrib:
raise CreoleLoaderError(_('default value already set for {}'
'').format(self.attrib['path']))
value = convert_tiramisu_value(child.text, type_)
if value is None: # and (elt.attrib['type'] != 'choice' or value not in self.attrib['values']):
value = u''
if is_follower:
self.attrib['default_multi'] = value
else:
self.attrib['default'] = value
elif child.tag == 'choice':
# already load
pass
elif child.tag == 'check':
params = []
for param in child:
params.append(self.parse_param(param))
#check.params = params
self.attrib['validators'].append((child.attrib['name'], params, child.attrib['warnings_only']))
else:
raise Exception('unknown tag {}'.format(child.tag))
if 'initkwargs' in convert_option:
self.attrib.update(convert_option['initkwargs'])
if elt.attrib['type'] == 'symlink':
del self.attrib['properties']
del self.attrib['multi']
self.attrib['opt'] = storage.get(self.attrib['opt'])
def parse_param(self, param):
name = param.attrib.get('name', '')
if param.attrib['type'] == 'string':
value = param.text
elif param.attrib['type'] == 'eole':
transitive = param.attrib.get('transitive', 'False')
if transitive == 'True':
transitive = True
elif transitive == 'False':
transitive = False
else:
raise CreoleLoaderError(_('unknown transitive boolean {}').format(transitive))
value = [param.text, transitive]
elif param.attrib['type'] == 'number':
value = int(param.text)
else:
raise CreoleLoaderError(_('unknown param type {}').format(param.attrib['type']))
return(name, value)
def add_information(self, key, value):
if key in self.informations:
raise CreoleLoaderError(_('key already exists in information {}').format(key))
self.informations[key] = value
def build_calculator(self, key):
if key in self.attrib:
values = self.attrib[key]
if isinstance(values, list):
is_list = True
else:
is_list = False
values = [values]
ret = []
for value in values:
if isinstance(value, tuple):
args = []
kwargs = {}
if len(value) == 3:
for param in value[1]:
if isinstance(param[1], list):
param_value = ParamOption(self.storage.get(param[1][0]).get(), notraisepropertyerror=param[1][1])
else:
param_value = ParamValue(param[1])
if not param[0]:
args.append(param_value)
else:
kwargs[param[0]] = param_value
ret.append(Calculation(getattr(self.eosfunc, value[0]),
Params(tuple(args),
kwargs=kwargs)))
else:
ret.append(value)
if not is_list:
self.attrib[key] = ret[0]
else:
self.attrib[key] = ret
def get(self):
if self.option is None:
if self.object_type is SymLinkOption:
self.attrib['opt'] = self.attrib['opt'].get()
else:
self.build_properties()
self.build_calculator('default')
self.build_calculator('validators')
if not self.attrib['validators']:
del self.attrib['validators']
try:
option = self.object_type(**self.attrib)
except Exception as err:
import traceback
traceback.print_exc()
name = self.attrib['name']
raise CreoleLoaderError(_('cannot create option {}: {}').format(name, err))
for key, value in self.informations.items():
option.impl_set_information(key, value)
self.option = option
return self.option
class Family(Common):
def __init__(self, elt, booleans, storage, force_icon=False):
self.option = None
self.attrib = {}
self.is_leader = False
if force_icon:
self.informations = {'icon': None}
else:
self.informations = {}
self.children = []
self.storage = storage
self.attrib['properties'] = []
for key, value in elt.attrib.items():
if key in booleans:
if value == 'True':
value = True
elif value == 'False':
value = False
else:
raise CreoleLoaderError(_('unknown value {} for {}').format(value, key))
if key == 'icon':
self.add_information('icon', value)
continue
elif key == 'hidden':
if value:
self.attrib['properties'].append(key)
elif key == 'mode':
self.attrib['properties'].append(value)
elif key == 'help':
self.add_information(key, value)
elif key == 'type':
pass
else:
self.attrib[key] = value
if 'doc' not in self.attrib:
self.attrib['doc'] = u''
def add(self, child):
self.children.append(child)
def add_information(self, key, value):
if key in self.informations and not (key == 'icon' and self.informations[key] is None):
raise CreoleLoaderError(_('key already exists in information {}').format(key))
self.informations[key] = value
def set_leader(self):
self.is_leader = True
def get(self):
if self.option is None:
self.attrib['children'] = []
for child in self.children:
self.attrib['children'].append(child.get())
self.build_properties()
try:
if not self.is_leader:
option = OptionDescription(**self.attrib)
else:
option = Leadership(**self.attrib)
#option = OptionDescription(**self.attrib)
except Exception as err:
raise CreoleLoaderError(_('cannot create optiondescription {}: {}').format(self.attrib['name'], err))
for key, value in self.informations.items():
option.impl_set_information(key, value)
self.option = option
#if self.is_leader:
# self.option.impl_set_group_type(groups.leader)
return self.option
def load(xmlroot: str,
dtd_path: str,
funcs_path: str):
tiramisu_objects = PopulateTiramisuObjects()
tiramisu_objects.parse_dtd(dtd_path)
tiramisu_objects.make_tiramisu_objects(xmlroot,
funcs_path)
return tiramisu_objects.storage.paths['.'].get()

693
src/rougail/objspace.py Normal file
View File

@ -0,0 +1,693 @@
"""
Creole flattener. Takes a bunch of Creole XML dispatched in differents folders
as an input and outputs a human readable flatened XML
Sample usage::
>>> from creole.objspace import CreoleObjSpace
>>> eolobj = CreoleObjSpace('/usr/share/creole/creole.dtd')
>>> eolobj.create_or_populate_from_xml('creole', ['/usr/share/eole/creole/dicos'])
>>> eolobj.space_visitor()
>>> eolobj.save('/tmp/creole_flatened_output.xml')
The CreoleObjSpace
- loads the XML into an internal CreoleObjSpace representation
- visits/annotates the objects
- dumps the object space as XML output into a single XML target
The visit/annotation stage is a complex step that corresponds to the Creole
procedures.
For example: a variable is redefined and shall be moved to another family
means that a variable1 = Variable() object in the object space who lives in the family1 parent
has to be moved in family2. The visit procedure changes the varable1's object space's parent.
"""
from collections import OrderedDict
from lxml.etree import Element, SubElement # pylint: disable=E0611
from json import dump
from .i18n import _
from .xmlreflector import XMLReflector, HIGH_COMPATIBILITY
from .annotator import ERASED_ATTRIBUTES, ActionAnnotator, ContainerAnnotator, SpaceAnnotator
from .utils import normalize_family
from .error import CreoleOperationError, SpaceObjShallNotBeUpdated, CreoleDictConsistencyError
# CreoleObjSpace's elements like 'family' or 'slave', that shall be forced to the Redefinable type
FORCE_REDEFINABLES = ('family', 'slave', 'container', 'disknod', 'variables', 'family_action')
# CreoleObjSpace's elements that shall be forced to the UnRedefinable type
FORCE_UNREDEFINABLES = ('value', 'input', 'profile', 'ewtapp', 'tag', 'saltaction')
# CreoleObjSpace's elements that shall be set to the UnRedefinable type
UNREDEFINABLE = ('multi', 'type')
PROPERTIES = ('hidden', 'frozen', 'auto_freeze', 'auto_save', 'force_default_on_freeze',
'force_store_value', 'disabled', 'mandatory')
CONVERT_PROPERTIES = {'auto_save': ['force_store_value'], 'auto_freeze': ['force_store_value', 'auto_freeze']}
RENAME_ATTIBUTES = {'description': 'doc'}
#TYPE_TARGET_CONDITION = ('variable', 'family')
# _____________________________________________________________________________
# special types definitions for the Object Space's internal representation
class RootCreoleObject(object):
""
class CreoleObjSpace(object):
"""DOM XML reflexion free internal representation of a Creole Dictionary
"""
choice = type('Choice', (RootCreoleObject,), OrderedDict())
property_ = type('Property', (RootCreoleObject,), OrderedDict())
# Creole ObjectSpace's Leadership variable class type
Leadership = type('Leadership', (RootCreoleObject,), OrderedDict())
"""
This Atom type stands for singleton, that is
an Object Space's atom object is present only once in the
object space's tree
"""
Atom = type('Atom', (RootCreoleObject,), OrderedDict())
"A variable that can't be redefined"
Redefinable = type('Redefinable', (RootCreoleObject,), OrderedDict())
"A variable can be redefined"
UnRedefinable = type('UnRedefinable', (RootCreoleObject,), OrderedDict())
def __init__(self, dtdfilename): # pylint: disable=R0912
self.index = 0
class ObjSpace(object): # pylint: disable=R0903
"""
Base object space
"""
self.space = ObjSpace()
self.xmlreflector = XMLReflector()
self.xmlreflector.parse_dtd(dtdfilename)
self.redefine_variables = None
self.probe_variables = []
# elt container's attrs list
self.container_elt_attr_list = [] #
# ['variable', 'separator', 'family']
self.forced_text_elts = set()
# ['disknod', 'follower', 'target', 'service', 'package', 'ip', 'value', 'tcpwrapper',
# 'interface', 'input', 'port']
self.forced_text_elts_as_name = set(['choice', 'property'])
self.forced_choice_option = {}
self.paths = Path()
self.list_conditions = {}
self.booleans_attributs = []
for elt in self.xmlreflector.dtd.iterelements():
attrs = {}
clstype = self.UnRedefinable
atomic = True
forced_text_elt = False
if elt.type == 'mixed':
forced_text_elt = True
if elt.name == 'container':
self.container_elt_attr_list = [elt.content.left.name]
self.parse_dtd_right_left_elt(elt.content)
for attr in elt.iterattributes():
atomic = False
if attr.default_value:
if attr.default_value == 'True':
default_value = True
elif attr.default_value == 'False':
default_value = False
else:
default_value = attr.default_value
attrs[attr.name] = default_value
if not attr.name.endswith('_type'):
values = list(attr.itervalues())
if values != []:
self.forced_choice_option.setdefault(elt.name, {})[attr.name] = values
if attr.name == 'redefine':
clstype = self.Redefinable
if attr.name == 'name' and forced_text_elt is True:
self.forced_text_elts.add(elt.name)
forced_text_elt = False
if set(attr.itervalues()) == set(['True', 'False']):
self.booleans_attributs.append(attr.name)
if forced_text_elt is True:
self.forced_text_elts_as_name.add(elt.name)
if elt.name in FORCE_REDEFINABLES:
clstype = self.Redefinable
elif elt.name in FORCE_UNREDEFINABLES:
clstype = self.UnRedefinable
elif atomic:
clstype = self.Atom
# Creole ObjectSpace class types, it enables us to create objects like:
# Service_restriction(), Ip(), Interface(), Host(), Fstab(), Package(), Disknod(),
# File(), Variables(), Family(), Variable(), Separators(), Separator(), Value(),
# Constraints()... and so on. Creole ObjectSpace is an object's reflexion of
# the XML elements
setattr(self, elt.name, type(elt.name.capitalize(), (clstype,), attrs))
def parse_dtd_right_left_elt(self, elt):
if elt.right.type == 'or':
self.container_elt_attr_list.append(elt.right.left.name)
self.parse_dtd_right_left_elt(elt.right)
else:
self.container_elt_attr_list.append(elt.right.name)
def _convert_boolean(self, value): # pylint: disable=R0201
"""Boolean coercion. The Creole XML may contain srings like `True` or `False`
"""
if isinstance(value, bool):
return value
if value == 'True':
return True
elif value == 'False':
return False
else:
raise TypeError(_('{} is not True or False').format(value)) # pragma: no cover
def _is_already_exists(self, name, space, child, namespace):
if isinstance(space, self.family): # pylint: disable=E1101
if namespace != 'creole':
name = space.path + '.' + name
return self.paths.path_is_defined(name)
if child.tag in ['family', 'family_action']:
norm_name = normalize_family(name)
else:
norm_name = name
return norm_name in getattr(space, child.tag, {})
def _translate_in_space(self, name, family, variable, namespace):
if not isinstance(family, self.family): # pylint: disable=E1101
if variable.tag in ['family', 'family_action']:
norm_name = normalize_family(name)
else:
norm_name = name
return getattr(family, variable.tag)[norm_name]
if namespace == 'creole':
path = name
else:
path = family.path + '.' + name
old_family_name = self.paths.get_variable_family_name(path)
if normalize_family(family.name) == old_family_name:
return getattr(family, variable.tag)[name]
old_family = self.space.variables['creole'].family[old_family_name] # pylint: disable=E1101
variable_obj = old_family.variable[name]
del old_family.variable[name]
if 'variable' not in vars(family):
family.variable = OrderedDict()
family.variable[name] = variable_obj
self.paths.append('variable', name, namespace, family.name, variable_obj)
return variable_obj
def remove_check(self, name): # pylint: disable=C0111
if hasattr(self.space, 'constraints') and hasattr(self.space.constraints, 'check'):
remove_checks = []
for idx, check in enumerate(self.space.constraints.check): # pylint: disable=E1101
if hasattr(check, 'target') and check.target == name:
remove_checks.append(idx)
remove_checks = list(set(remove_checks))
remove_checks.sort(reverse=True)
for idx in remove_checks:
self.space.constraints.check.pop(idx) # pylint: disable=E1101
def remove_condition(self, name): # pylint: disable=C0111
for idx, condition in enumerate(self.space.constraints.condition): # pylint: disable=E1101
remove_targets = []
if hasattr(condition, 'target'):
for target_idx, target in enumerate(condition.target):
if target.name == name:
remove_targets.append(target_idx)
remove_targets = list(set(remove_targets))
remove_targets.sort(reverse=True)
for idx in remove_targets:
del condition.target[idx]
def create_or_update_space_object(self, subspace, space, child, namespace):
"""Creates or retrieves the space object that corresponds
to the `child` XML object
Two attributes of the `child` XML object are important:
- with the `redefine` boolean flag attribute we know whether
the corresponding space object shall be created or updated
- `True` means that the corresponding space object shall be updated
- `False` means that the corresponding space object shall be created
- with the `exists` boolean flag attribute we know whether
the corresponding space object shall be created
(or nothing -- that is the space object isn't modified)
- `True` means that the corresponding space object shall be created
- `False` means that the corresponding space object is not updated
In the special case `redefine` is True and `exists` is False,
we create the corresponding space object if it doesn't exist
and we update it if it exists.
:return: the corresponding space object of the `child` XML object
"""
if child.tag in self.forced_text_elts_as_name:
name = child.text
else:
name = subspace['name']
if self._is_already_exists(name, space, child, namespace):
if child.tag in FORCE_REDEFINABLES:
redefine = self._convert_boolean(subspace.get('redefine', True))
else:
redefine = self._convert_boolean(subspace.get('redefine', False))
exists = self._convert_boolean(subspace.get('exists', True))
if redefine is True:
return self._translate_in_space(name, space, child, namespace)
elif exists is False:
raise SpaceObjShallNotBeUpdated()
else:
raise CreoleDictConsistencyError(_('Already present in another XML file, {} '
'cannot be re-created').format(name))
else:
redefine = self._convert_boolean(subspace.get('redefine', False))
exists = self._convert_boolean(subspace.get('exists', False))
if redefine is False or exists is True:
return getattr(self, child.tag)()
else:
raise CreoleDictConsistencyError(_('Redefined object: '
'{} does not exist yet').format(name))
def generate_creoleobj(self, child, space, namespace):
"""
instanciates or creates Creole Object Subspace objects
"""
if issubclass(getattr(self, child.tag), self.Redefinable):
creoleobj = self.create_or_update_space_object(child.attrib, space, child, namespace)
else:
# instanciates an object from the CreoleObjSpace's builtins types
# example : child.tag = constraints -> a self.Constraints() object is created
creoleobj = getattr(self, child.tag)()
# this Atom instance has to be a singleton here
# we do not re-create it, we reuse it
if isinstance(creoleobj, self.Atom) and child.tag in vars(space):
creoleobj = getattr(space, child.tag)
self.create_tree_structure(space, child, creoleobj)
return creoleobj
def create_tree_structure(self, space, child, creoleobj): # pylint: disable=R0201
"""
Builds the tree structure of the object space here
we set containers attributes in order to be populated later on
for example::
space = Family()
space.variable = OrderedDict()
another example:
space = Variable()
space.value = list()
"""
if child.tag not in vars(space):
if isinstance(creoleobj, self.Redefinable):
setattr(space, child.tag, OrderedDict())
elif isinstance(creoleobj, self.UnRedefinable):
setattr(space, child.tag, [])
elif isinstance(creoleobj, self.Atom):
pass
else: # pragma: no cover
raise CreoleOperationError(_("Creole object {} "
"has a wrong type").format(type(creoleobj)))
def _add_to_tree_structure(self, creoleobj, space, child): # pylint: disable=R0201
if isinstance(creoleobj, self.Redefinable):
name = creoleobj.name
if child.tag == 'family' or child.tag == 'family_action':
name = normalize_family(name)
getattr(space, child.tag)[name] = creoleobj
elif isinstance(creoleobj, self.UnRedefinable):
getattr(space, child.tag).append(creoleobj)
else:
setattr(space, child.tag, creoleobj)
def _set_text_to_obj(self, child, creoleobj):
if child.text is None:
text = None
else:
text = child.text.strip()
if text:
if child.tag in self.forced_text_elts_as_name:
creoleobj.name = text
else:
creoleobj.text = text
def _set_xml_attributes_to_obj(self, child, creoleobj):
redefine = self._convert_boolean(child.attrib.get('redefine', False))
has_value = hasattr(creoleobj, 'value')
if HIGH_COMPATIBILITY and has_value:
has_value = len(child) != 1 or child[0].text != None
if (redefine is True and child.tag == 'variable' and has_value
and len(child) != 0):
del creoleobj.value
for attr, val in child.attrib.items():
if redefine and attr in UNREDEFINABLE:
# UNREDEFINABLE concerns only 'variable' node so we can fix name
# to child.attrib['name']
name = child.attrib['name']
raise CreoleDictConsistencyError(_("cannot redefine attribute {} for variable {}").format(attr, name))
if isinstance(getattr(creoleobj, attr, None), bool):
if val == 'False':
val = False
elif val == 'True':
val = True
else: # pragma: no cover
raise CreoleOperationError(_('value for {} must be True or False, '
'not {}').format(attr, val))
if not (attr == 'name' and getattr(creoleobj, 'name', None) != None):
setattr(creoleobj, attr, val)
def _creoleobj_tree_visitor(self, child, creoleobj, namespace):
"""Creole object tree manipulations
"""
if child.tag == 'variable' and child.attrib.get('remove_check', False):
self.remove_check(creoleobj.name)
if child.tag == 'variable' and child.attrib.get('remove_condition', False):
self.remove_condition(creoleobj.name)
if child.tag in ['auto', 'fill', 'check']:
variable_name = child.attrib['target']
# XXX not working with variable not in creole and in leader/followers
if variable_name in self.redefine_variables:
creoleobj.redefine = True
else:
creoleobj.redefine = False
if not hasattr(creoleobj, 'index'):
creoleobj.index = self.index
if child.tag in ['auto', 'fill', 'condition', 'check', 'action']:
creoleobj.namespace = namespace
def xml_parse_document(self, document, space, namespace, is_in_family=False):
"""Parses a Creole XML file
populates the CreoleObjSpace
"""
family_names = []
for child in document:
# this index enables us to reorder the 'fill' and 'auto' objects
self.index += 1
# doesn't proceed the XML commentaries
if not isinstance(child.tag, str):
continue
if child.tag == 'family':
is_in_family = True
if child.attrib['name'] in family_names:
raise CreoleDictConsistencyError(_('Family {} is set several times').format(child.attrib['name']))
family_names.append(child.attrib['name'])
if child.tag == 'variables':
child.attrib['name'] = namespace
if HIGH_COMPATIBILITY and child.tag == 'value' and child.text == None:
continue
# creole objects creation
try:
creoleobj = self.generate_creoleobj(child, space, namespace)
except SpaceObjShallNotBeUpdated:
continue
self._set_text_to_obj(child, creoleobj)
self._set_xml_attributes_to_obj(child, creoleobj)
self._creoleobj_tree_visitor(child, creoleobj, namespace)
self._fill_creoleobj_path_attribute(space, child, namespace, document, creoleobj)
self._add_to_tree_structure(creoleobj, space, child)
if list(child) != []:
self.xml_parse_document(child, creoleobj, namespace, is_in_family)
def _fill_creoleobj_path_attribute(self, space, child, namespace, document, creoleobj): # pylint: disable=R0913
"""Fill self.paths attributes
"""
if not isinstance(space, self.help): # pylint: disable=E1101
if child.tag == 'variable':
family_name = normalize_family(document.attrib['name'])
self.paths.append('variable', child.attrib['name'], namespace, family_name,
creoleobj)
if child.attrib.get('redefine', 'False') == 'True':
if namespace == 'creole':
self.redefine_variables.append(child.attrib['name'])
else:
self.redefine_variables.append(namespace + '.' + family_name + '.' +
child.attrib['name'])
if child.tag == 'family':
family_name = normalize_family(child.attrib['name'])
if namespace != 'creole':
family_name = namespace + '.' + family_name
self.paths.append('family', family_name, namespace, creoleobj=creoleobj)
creoleobj.path = self.paths.get_family_path(family_name, namespace)
def create_or_populate_from_xml(self, namespace, xmlfolders, from_zephir=None):
"""Parses a bunch of XML files
populates the CreoleObjSpace
"""
documents = self.xmlreflector.load_xml_from_folders(xmlfolders, from_zephir)
for xmlfile, document in documents:
try:
self.redefine_variables = []
self.xml_parse_document(document, self.space, namespace)
except Exception as err:
#print(_('error in XML file {}').format(xmlfile))
raise err
def populate_from_zephir(self, namespace, xmlfile):
self.redefine_variables = []
document = self.xmlreflector.parse_xmlfile(xmlfile, from_zephir=True, zephir2=True)
self.xml_parse_document(document, self.space, namespace)
def space_visitor(self, eosfunc_file): # pylint: disable=C0111
ActionAnnotator(self)
ContainerAnnotator(self)
SpaceAnnotator(self, eosfunc_file)
def save(self, filename, force_no_save=False):
"""Save an XML output on disk
:param filename: the full XML filename
"""
xml = Element('creole')
self._xml_export(xml, self.space)
if not force_no_save:
self.xmlreflector.save_xmlfile(filename, xml)
return xml
def save_probes(self, filename, force_no_save=False):
"""Save an XML output on disk
:param filename: the full XML filename
"""
ret = {}
for variable in self.probe_variables:
args = []
kwargs = {}
if hasattr(variable, 'param'):
for param in variable.param:
list_param = list(vars(param).keys())
if 'index' in list_param:
list_param.remove('index')
if list_param == ['text']:
args.append(param.text)
elif list_param == ['text', 'name']:
kwargs[param.name] = param.text
else:
print(vars(param))
raise Exception('hu?')
ret[variable.target] = {'function': variable.name,
'args': args,
'kwargs': kwargs}
if not force_no_save:
with open(filename, 'w') as fhj:
dump(ret, fhj)
return ret
def _get_attributes(self, space): # pylint: disable=R0201
for attr in dir(space):
if not attr.startswith('_'):
yield attr
def _sub_xml_export(self, name, node, node_name, space, current_space):
if isinstance(space, dict):
space = list(space.values())
if isinstance(space, list):
for subspace in space:
if isinstance(subspace, self.Leadership):
_name = 'leader'
subspace.doc = subspace.variable[0].description
#subspace.doc = 'Leadership {}'.format(subspace.name)
else:
_name = name
if name in ['containers', 'variables', 'actions']:
_name = 'family'
if HIGH_COMPATIBILITY and not hasattr(subspace, 'doc'):
subspace.doc = ''
if _name == 'value' and (not hasattr(subspace, 'name') or subspace.name is None):
continue
child_node = SubElement(node, _name)
self._xml_export(child_node, subspace, _name)
elif isinstance(space, self.Atom):
if name == 'containers':
child_node = SubElement(node, 'family')
child_node.attrib['name'] = name
else:
child_node = SubElement(node, name)
for subname in self._get_attributes(space):
subspace = getattr(space, subname)
self._sub_xml_export(subname, child_node, name, subspace, space)
elif isinstance(space, self.Redefinable):
child_node = SubElement(node, 'family')
child_node.attrib['name'] = name
for subname in self._get_attributes(space):
subspace = getattr(space, subname)
self._sub_xml_export(subname, child_node, name, subspace, space)
else:
# FIXME plutot dans annotator ...
if name in PROPERTIES and node.tag in ['variable', 'family', 'leader']:
if space is True:
for prop in CONVERT_PROPERTIES.get(name, [name]):
SubElement(node, 'property').text = prop
elif name not in ERASED_ATTRIBUTES:
if name == 'name' and node_name in self.forced_text_elts_as_name and not hasattr(current_space, 'param'):
if isinstance(space, str):
node.text = space
else:
node.text = str(space)
elif name == 'text' and node_name in self.forced_text_elts:
node.text = space
elif node.tag == 'family' and name == 'name':
if 'doc' not in node.attrib.keys():
node.attrib['doc'] = space
node.attrib['name'] = normalize_family(space, check_name=False)
elif node.tag in ['variable', 'family', 'leader'] and name == 'mode':
if space is not None:
SubElement(node, 'property').text = space
else:
if name in RENAME_ATTIBUTES:
name = RENAME_ATTIBUTES[name]
if space is not None:
node.attrib[name] = str(space)
def _xml_export(self, node, space, node_name='creole'):
for name in self._get_attributes(space):
subspace = getattr(space, name)
self._sub_xml_export(name, node, node_name, subspace, space)
class Path(object):
"""Helper class to handle the `path` attribute of a CreoleObjSpace
instance.
sample: path="creole.general.condition"
"""
def __init__(self):
self.variables = {}
self.families = {}
def append(self, pathtype, name, namespace, family=None, creoleobj=None): # pylint: disable=C0111
if pathtype == 'family':
self.families[name] = dict(name=name, namespace=namespace, creoleobj=creoleobj)
elif pathtype == 'variable':
if namespace == 'creole':
varname = name
else:
if '.' in name:
varname = name
else:
varname = '.'.join([namespace, family, name])
self.variables[varname] = dict(name=name, family=family, namespace=namespace,
leader=None, creoleobj=creoleobj)
else: # pragma: no cover
raise Exception('unknown pathtype {}'.format(pathtype))
def get_family_path(self, name, current_namespace): # pylint: disable=C0111
if current_namespace is None: # pragma: no cover
raise CreoleOperationError('current_namespace must not be None')
dico = self.families[normalize_family(name, check_name=False)]
if dico['namespace'] != 'creole' and current_namespace != dico['namespace']:
raise CreoleDictConsistencyError(_('A family located in the {} namespace '
'shall not be used in the {} namespace').format(
dico['namespace'], current_namespace))
path = dico['name']
if dico['namespace'] is not None and '.' not in dico['name']:
path = '.'.join([dico['namespace'], path])
return path
def get_family_namespace(self, name): # pylint: disable=C0111
dico = self.families[name]
if dico['namespace'] is None:
return dico['name']
return dico['namespace']
def get_family_obj(self, name): # pylint: disable=C0111
if name not in self.families:
raise CreoleDictConsistencyError(_('unknown family {}').format(name))
dico = self.families[name]
return dico['creoleobj']
def get_variable_name(self, name): # pylint: disable=C0111
dico = self._get_variable(name)
return dico['name']
def get_variable_obj(self, name): # pylint: disable=C0111
dico = self._get_variable(name)
return dico['creoleobj']
def get_variable_family_name(self, name): # pylint: disable=C0111
dico = self._get_variable(name)
return dico['family']
def get_variable_family_path(self, name): # pylint: disable=C0111
dico = self._get_variable(name)
list_path = [dico['namespace'], dico['family']]
if dico['leader'] is not None:
list_path.append(dico['leader'])
return '.'.join(list_path)
def get_variable_namespace(self, name): # pylint: disable=C0111
return self._get_variable(name)['namespace']
def get_variable_path(self, name, current_namespace, allow_source=False): # pylint: disable=C0111
if current_namespace is None: # pragma: no cover
raise CreoleOperationError('current_namespace must not be None')
dico = self._get_variable(name)
if not allow_source:
if dico['namespace'] != 'creole' and current_namespace != dico['namespace']:
raise CreoleDictConsistencyError(_('A variable located in the {} namespace '
'shall not be used in the {} namespace').format(
dico['namespace'], current_namespace))
if '.' in dico['name']:
return dico['name']
list_path = [dico['namespace'], dico['family']]
if dico['leader'] is not None:
list_path.append(dico['leader'])
list_path.append(dico['name'])
return '.'.join(list_path)
def path_is_defined(self, name): # pylint: disable=C0111
return name in self.variables
def set_leader(self, name, leader): # pylint: disable=C0111
dico = self._get_variable(name)
namespace = dico['namespace']
if dico['leader'] != None:
raise CreoleDictConsistencyError(_('Already defined leader {} for variable'
' {}'.format(dico['leader'], name)))
dico['leader'] = leader
if namespace != 'creole':
new_path = self.get_variable_path(name, namespace)
self.append('variable', new_path, namespace, family=dico['family'], creoleobj=dico['creoleobj'])
self.variables[new_path]['leader'] = leader
del self.variables[name]
def _get_variable(self, name):
if name not in self.variables:
if name.startswith('creole.'):
name = name.split('.')[-1]
if name not in self.variables:
raise CreoleDictConsistencyError(_('unknown option {}').format(name))
return self.variables[name]
def get_leader(self, name): # pylint: disable=C0111
dico = self._get_variable(name)
return dico['leader']

442
src/rougail/template.py Normal file
View File

@ -0,0 +1,442 @@
# -*- coding: utf-8 -*-
"""
Gestion du mini-langage de template
On travaille sur les fichiers cibles
"""
import imp
import sys
from shutil import copy
import logging
from typing import Dict
from subprocess import call
from os import listdir, unlink
from os.path import basename, join, split, isfile
from tempfile import mktemp
from Cheetah import Parser
# l'encoding du template est déterminé par une regexp (encodingDirectiveRE dans Parser.py)
# il cherche un ligne qui ressemble à '#encoding: utf-8
# cette classe simule le module 're' et retourne toujours l'encoding utf-8
# 6224
class FakeEncoding():
def groups(self):
return ('utf-8',)
def search(self, *args):
return self
Parser.encodingDirectiveRE = FakeEncoding()
from Cheetah.Template import Template as ChtTemplate
from Cheetah.NameMapper import NotFound as CheetahNotFound
from tiramisu import Config
from .config import patch_dir, templatedir, distrib_dir
from .error import FileNotFound, TemplateError, TemplateDisabled
from .i18n import _
log = logging.getLogger(__name__)
log.addHandler(logging.NullHandler())
class IsDefined(object):
"""
filtre permettant de ne pas lever d'exception au cas où
la variable Creole n'est pas définie
"""
def __init__(self, context):
self.context = context
def __call__(self, varname):
if '.' in varname:
splitted_var = varname.split('.')
if len(splitted_var) != 2:
msg = _("Group variables must be of type master.slave")
raise KeyError(msg)
master, slave = splitted_var
if master in self.context:
return slave in self.context[master].slave.keys()
return False
else:
return varname in self.context
class CreoleGet(object):
def __init__(self, context):
self.context = context
def __call__(self, varname):
return self.context[varname]
def __getitem__(self, varname):
"""For bracket and dotted notation
"""
return self.context[varname]
def __contains__(self, varname):
"""Check variable existence in context
"""
return varname in self.context
@classmethod
def cl_compile(kls, *args, **kwargs):
kwargs['compilerSettings'] = {'directiveStartToken' : '%',
'cheetahVarStartToken' : '%%',
'EOLSlurpToken' : '%',
'PSPStartToken' : 'µ' * 10,
'PSPEndToken' : 'µ' * 10,
'commentStartToken' : 'µ' * 10,
'commentEndToken' : 'µ' * 10,
'multiLineCommentStartToken' : 'µ' * 10,
'multiLineCommentEndToken' : 'µ' * 10}
return kls.old_compile(*args, **kwargs)
ChtTemplate.old_compile = ChtTemplate.compile
ChtTemplate.compile = cl_compile
class CreoleClient:
def __init__(self,
config: Config):
self.config = config
class CheetahTemplate(ChtTemplate):
"""classe pour personnaliser et faciliter la construction
du template Cheetah
"""
def __init__(self,
filename: str,
context,
eosfunc: Dict,
config: Config,
current_container: str):
"""Initialize Creole CheetahTemplate
"""
ChtTemplate.__init__(self, file=filename,
searchList=[context, eosfunc, {'is_defined' : IsDefined(context),
'creole_client' : CreoleClient(config),
'current_container':CreoleGet(current_container),
}])
class CreoleMaster(object):
def __init__(self, value, slave=None, index=None):
"""
On rend la variable itérable pour pouvoir faire:
for ip in iplist:
print ip.network
print ip.netmask
print ip
index is used for CreoleLint
"""
self._value = value
if slave is not None:
self.slave = slave
else:
self.slave = {}
self._index = index
def __getattr__(self, name):
"""Get slave variable or attribute of master value.
If the attribute is a name of a slave variable, return its value.
Otherwise, returns the requested attribute of master value.
"""
if name in self.slave:
value = self.slave[name]
if isinstance(value, Exception):
raise value
return value
else:
return getattr(self._value, name)
def __getitem__(self, index):
"""Get a master.slave at requested index.
"""
ret = {}
for key, values in self.slave.items():
ret[key] = values[index]
return CreoleMaster(self._value[index], ret, index)
def __iter__(self):
"""Iterate over master.slave.
Return synchronised value of master.slave.
"""
for i in range(len(self._value)):
ret = {}
for key, values in self.slave.items():
ret[key] = values[i]
yield CreoleMaster(self._value[i], ret, i)
def __len__(self):
"""Delegate to master value
"""
return len(self._value)
def __repr__(self):
"""Show CreoleMaster as dictionary.
The master value is stored under 'value' key.
The slaves are stored under 'slave' key.
"""
return repr({'value': self._value, 'slave': self.slave})
def __eq__(self, value):
return value == self._value
def __ne__(self, value):
return value != self._value
def __lt__(self, value):
return self._value < value
def __le__(self, value):
return self._value <= value
def __gt__(self, value):
return self._value > value
def __ge__(self, value):
return self._value >= value
def __str__(self):
"""Delegate to master value
"""
return str(self._value)
def __add__(self, val):
return self._value.__add__(val)
def __radd__(self, val):
return val + self._value
def __contains__(self, item):
return item in self._value
def add_slave(self, name, value):
"""Add a slave variable
Minimal check on type and value of the slave in regards to the
master one.
@param name: name of the slave variable
@type name: C{str}
@param value: value of the slave variable
"""
if isinstance(self._value, list):
if not isinstance(value, list):
raise TypeError
elif len(value) != len(self._value):
raise ValueError(_('length mismatch'))
new_value = []
for val in value:
if isinstance(val, dict):
new_value.append(ValueError(val['err']))
else:
new_value.append(val)
value = new_value
elif isinstance(value, list):
raise TypeError
self.slave[name] = value
class CreoleTemplateEngine:
"""Engine to process Creole cheetah template
"""
def __init__(self,
config: Config,
eosfunc_file: str):
self.config = config
eos = {}
eosfunc = imp.load_source('eosfunc', eosfunc_file)
for func in dir(eosfunc):
if not func.startswith('_'):
eos[func] = getattr(eosfunc, func)
self.eosfunc = eos
self.creole_variables_dict = {}
self.load_eole_variables(self.config.option('creole'))
def load_eole_variables(self, optiondescription):
# remplacement des variables EOLE
for option in optiondescription.list('all'):
if option.option.isoptiondescription():
if option.option.isleadership():
print('leadership')
raise Exception('a faire')
else:
self.load_eole_variables(option)
else:
self.creole_variables_dict[option.option.name()] = option.value.get()
#if varname.find('.') != -1:
# #support des groupes
# mastername, slavename = varname.split('.')
# if not mastername in self.creole_variables_dict or not \
# isinstance(self.creole_variables_dict [mastername],
# CreoleMaster):
# # Create the master variable
# if mastername in values:
# self.creole_variables_dict[mastername] = CreoleMaster(values[mastername])
# else:
# #only for CreoleLint
# self.creole_variables_dict[mastername] = CreoleMaster(value)
# #test only for CreoleLint
# if mastername != slavename:
# self.creole_variables_dict[mastername].add_slave(slavename, value)
#else:
# self.creole_variables_dict[varname] = value
def patch_template(self,
filename: str):
"""Apply patch to a template
"""
patch_cmd = ['patch', '-d', templatedir, '-N', '-p1']
patch_no_debug = ['-s', '-r', '-', '--backup-if-mismatch']
# patches variante + locaux
for directory in [join(patch_dir, 'variante'), patch_dir]:
patch_file = join(directory, f'{filename}.patch')
if isfile(patch_file):
log.info(_("Patching template '{filename}' with '{patch_file}'"))
ret = call(patch_cmd + patch_no_debug + ['-i', patch_file])
if ret:
patch_cmd_err = ' '.join(patch_cmd + ['-i', patch_file])
log.error(_(f"Error applying patch: '{patch_file}'\nTo reproduce and fix this error {patch_cmd_err}"))
copy(filename, templatedir)
def strip_template_comment(self,
filename: str):
"""Strip comment from template
This apply if filevar has a del_comment attribut
"""
# suppression des commentaires si demandé (attribut del_comment)
if 'del_comment' in filevar and filevar['del_comment'] != '':
strip_cmd = ['sed', '-i']
log.info(_("Cleaning file '{0}'").format( filevar['source'] ))
raise Exception('hu')
#ret, out, err = pyeole.process.system_out(strip_cmd
# + ['/^\s*{0}/d ; /^$/d'.format(filevar['del_comment']),
# filevar['source'] ])
#if ret != 0:
# msg = _("Error removing comments '{0}': {1}")
# raise TemplateError(msg.format(filevar['del_comment'], err))
def prepare_template(self,
filename: str):
"""Prepare template source file
"""
log.info(_("Copy template: '{filename}' -> '{templatedir}'"))
copy(filename, templatedir)
self.patch_template(filename)
# self.strip_template_comment(filename)
def process(self,
filevar: Dict,
container: str):
"""Process a cheetah template
"""
# full path of the destination file
destfilename = join(dest_dir, filevar['source'])
log.info(_(f"Cheetah processing: '{destfilename}'"))
try:
cheetah_template = CheetahTemplate(join(templatedir, filevar['source']),
self.creole_variables_dict,
self.eosfunc,
self.config.config.copy(),
container)
data = str(cheetah_template)
except CheetahNotFound as err:
varname = err.args[0][13:-1]
raise TemplateError(_(f"Error: unknown variable used in template {destfilename} : {varname}"))
except Exception as err:
raise TemplateError(_(f"Error while instantiating template {destfilename}: {err}"))
with open(destfilename, 'w') as file_h:
file_h.write(data)
def change_properties(self,
filevar: Dict):
destfilename = join(dest_dir, filevar['source'])
#chowncmd = ['chown']
#chownarg = ''
chmodcmd = ['chmod']
chmodarg = ''
#if 'owner' in filevar and filevar['owner']:
# chownarg = filevar['owner']
#else:
# chownarg = 'root'
#if 'group' in filevar and filevar['group']:
# chownarg += ":" + filevar['group']
#else:
# chownarg += ':root'
if 'mode' in filevar and filevar['mode']:
chmodarg = filevar['mode']
else:
chmodarg = '0644'
#chowncmd.extend( [chownarg, destfilename] )
chmodcmd.extend([chmodarg, destfilename])
#log.info(_('Changing properties: {0}').format(' '.join(chowncmd)) )
#ret = call(chowncmd)
#if ret:
# log.error(_('Error changing properties {0}: {1}').format(ret, err) )
log.info(_('Changing properties: {0}').format(' '.join(chmodcmd)) )
ret = call(chmodcmd)
if ret:
chmod_cmd = ' '.join(chmodcmd)
log.error(_(f'Error changing properties: {chmodcmd}'))
def instance_file(self,
filevar: Dict,
container: str):
"""Run templatisation on one file of one container
"""
log.info(_("Instantiating file '{filename}'"))
self.process(filevar,
container)
self.change_properties(filevar)
def instance_files(self,
container=None):
"""Run templatisation on all files of all containers
@param container: name of a container
@type container: C{str}
"""
for template in listdir(distrib_dir):
self.prepare_template(join(distrib_dir, template))
for container_obj in self.config.option('containers').list('all'):
current_container = container_obj.option.doc()
if container is not None and container != current_container:
continue
for fills in container_obj.list('all'):
if fills.option.name() == 'files':
for fill_obj in fills.list('all'):
fill = fill_obj.value.dict()
filename = fill['source']
if not isfile(join(distrib_dir, filename)):
raise FileNotFound(_(f"File {filename} does not exist."))
print(fill)
if fill['activate']:
self.instance_file(fill, current_container)
else:
log.debug(_("Instantiation of file '{filename}' disabled"))
def generate(config: Config,
eosfunc_file: str,
container: str=None):
engine = CreoleTemplateEngine(config,
eosfunc_file)
engine.instance_files(container=container)

26
src/rougail/utils.py Normal file
View File

@ -0,0 +1,26 @@
"""
utilitaires créole
"""
import unicodedata
# définition des classes d'adresse IP existantes
def normalize_family(family_name, check_name=True):
"""
il ne faut pas d'espace, d'accent, de majuscule, de tiré, ...
dans le nom des familles
"""
f = family_name
f = f.replace('-', '_')
#f = f.replace(u'é', 'e')
#f = f.replace(u'è', 'e')
nfkd_form = unicodedata.normalize('NFKD', f)
f = u"".join([c for c in nfkd_form if not unicodedata.combining(c)])
f = f.replace(' ', '_')
f = f.lower()
if f[0].isnumeric():
raise ValueError(u'Le nom de la famille ne doit pas commencer par un chiffre : {0}'.format(f))
if check_name and f == 'containers':
raise ValueError(u'nom de la famille interdit {0}'.format(f))
return f

161
src/rougail/xml_compare.py Normal file
View File

@ -0,0 +1,161 @@
try:
import doctest
doctest.OutputChecker
except (AttributeError, ImportError): # Python < 2.4
import util.doctest24 as doctest
try:
import xml.etree.ElementTree as ET
except ImportError:
import elementtree.ElementTree as ET
from xml.parsers.expat import ExpatError as XMLParseError
RealOutputChecker = doctest.OutputChecker
def debug(*msg):
import sys
print >> sys.stderr, ' '.join(map(str, msg))
class HTMLOutputChecker(RealOutputChecker):
def check_output(self, want, got, optionflags):
normal = RealOutputChecker.check_output(self, want, got, optionflags)
if normal or not got:
return normal
try:
want_xml = make_xml(want)
except XMLParseError:
pass
else:
try:
got_xml = make_xml(got)
except XMLParseError:
pass
else:
if xml_compare(want_xml, got_xml):
return True
return False
def output_difference(self, example, got, optionflags):
actual = RealOutputChecker.output_difference(
self, example, got, optionflags)
want_xml = got_xml = None
try:
want_xml = make_xml(example.want)
want_norm = make_string(want_xml)
except XMLParseError as e:
if example.want.startswith('<'):
want_norm = '(bad XML: %s)' % e
# '<xml>%s</xml>' % example.want
else:
return actual
try:
got_xml = make_xml(got)
got_norm = make_string(got_xml)
except XMLParseError as e:
if example.want.startswith('<'):
got_norm = '(bad XML: %s)' % e
else:
return actual
s = '%s\nXML Wanted: %s\nXML Got : %s\n' % (
actual, want_norm, got_norm)
if got_xml and want_xml:
result = []
xml_compare(want_xml, got_xml, result.append)
s += 'Difference report:\n%s\n' % '\n'.join(result)
return s
def xml_sort(children):
tcl1 = {}
#idx = 0
for child in children:
if 'name' in child.attrib:
key = child.attrib['name']
else:
key = child.tag
if key not in tcl1:
tcl1[key] = []
tcl1[key].append(child)
cl1_keys = list(tcl1.keys())
cl1_keys.sort()
cl1 = []
for key in cl1_keys:
cl1.extend(tcl1[key])
return cl1
def xml_compare(x1, x2):
if x1.tag != x2.tag:
print ('Tags do not match: %s and %s' % (x1.tag, x2.tag))
return False
for name, value in x1.attrib.items():
if x2.attrib.get(name) != value:
print ('Attributes do not match: %s=%r, %s=%r'
% (name, value, name, x2.attrib.get(name)))
return False
for name in x2.attrib:
if name not in x1.attrib:
print ('x2 has an attribute x1 is missing: %s'
% name)
return False
if not text_compare(x1.text, x2.text):
print ('text: %r != %r' % (x1.text, x2.text))
return False
if not text_compare(x1.tail, x2.tail):
print ('tail: %r != %r' % (x1.tail, x2.tail))
return False
cl1 = xml_sort(x1.getchildren())
cl2 = xml_sort(x2.getchildren())
if len(cl1) != len(cl2):
cl1_tags = []
for c in cl1:
cl1_tags.append(c.tag)
cl2_tags = []
for c in cl2:
cl2_tags.append(c.tag)
print ('children length differs, %i != %i (%s != %s)'
% (len(cl1), len(cl2), cl1_tags, cl2_tags))
return False
i = 0
for c1, c2 in zip(cl1, cl2):
i += 1
if not xml_compare(c1, c2):
if 'name' in c1.attrib:
name = c1.attrib['name']
else:
name = i
print ('in tag "%s" with name "%s"'
% (c1.tag, name))
return False
return True
def text_compare(t1, t2):
if not t1 and not t2:
return True
if t1 == '*' or t2 == '*':
return True
return (t1 or '').strip() == (t2 or '').strip()
def make_xml(s):
return ET.XML('<xml>%s</xml>' % s)
def make_string(xml):
if isinstance(xml, (str, unicode)):
xml = make_xml(xml)
s = ET.tostring(xml)
if s == '<xml />':
return ''
assert s.startswith('<xml>') and s.endswith('</xml>'), repr(s)
return s[5:-6]
def install():
doctest.OutputChecker = HTMLOutputChecker

View File

@ -0,0 +1,96 @@
# coding: utf-8
from os.path import join, isfile, basename, isdir
from os import listdir
#from io import BytesIO
from lxml.etree import DTD, parse, tostring # , XMLParser
from .i18n import _
from .error import CreoleDictConsistencyError
HIGH_COMPATIBILITY = True
class XMLReflector(object):
"""Helper class for loading the Creole XML file,
parsing it, validating against the Creole DTD,
writing the xml result on the disk
"""
def __init__(self):
self.dtd = None
def parse_dtd(self, dtdfilename):
"""Loads the Creole DTD
:raises IOError: if the DTD is not found
:param dtdfilename: the full filename of the Creole DTD
"""
if not isfile(dtdfilename):
raise IOError(_("no such DTD file: {}").format(dtdfilename))
with open(dtdfilename, 'r') as dtdfd:
self.dtd = DTD(dtdfd)
def parse_xmlfile(self, xmlfile):
"""Parses and validates some Creole XML against the Creole DTD
:returns: the root element tree object
"""
# FIXME zephir2
# document = parse(BytesIO(xmlfile), XMLParser(remove_blank_text=True))
document = parse(xmlfile)
if not self.dtd.validate(document):
raise CreoleDictConsistencyError(_("not a valid xml file: {}").format(xmlfile))
return document.getroot()
def load_xml_from_folders(self, xmlfolders, from_zephir):
"""Loads all the XML files located in the xmlfolders' list
:param xmlfolders: list of full folder's name
"""
documents = []
if from_zephir:
for idx, xmlfile in enumerate(xmlfolders):
documents.append(('generate_{}'.format(idx), self.parse_xmlfile(xmlfile, from_zephir=from_zephir)))
else:
if not isinstance(xmlfolders, list):
xmlfolders = [xmlfolders]
for xmlfolder in xmlfolders:
if isinstance(xmlfolder, list) or isinstance(xmlfolder, tuple):
# directory group : collect files from each
# directory and sort them before loading
group_files = []
for idx, subdir in enumerate(xmlfolder):
if isdir(subdir):
for filename in listdir(subdir):
group_files.append((filename, idx, subdir))
else:
group_files.append(basename(subdir), idx, dirname(subdir))
def sort_group(file1, file2):
if file1[0] == file2[0]:
# sort by initial xmlfolder order if same name
return file1[1].__cmp__(file2[1])
# sort by filename
elif file1[0] > file2[0]:
return 1
else:
return -1
group_files.sort(sort_group)
filenames = [join(f[2], f[0]) for f in group_files]
elif isdir(xmlfolder):
filenames = []
for filename in listdir(xmlfolder):
filenames.append(join(xmlfolder, filename))
filenames.sort()
else:
filenames = [xmlfolder]
for xmlfile in filenames:
if xmlfile.endswith('.xml'):
#xmlfile_path = join(xmlfolder, xmlfile)
documents.append((xmlfile, self.parse_xmlfile(xmlfile)))
return documents
def save_xmlfile(self, xmlfilename, xml): # pylint: disable=R0201
"""Write a bunch of XML on the disk
"""
with open(xmlfilename, 'w') as xmlfh:
xmlfh.write(tostring(xml, pretty_print=True, encoding="UTF-8", xml_declaration=True).decode('utf8'))