rougail/creole/annotator.py

1335 lines
67 KiB
Python

# coding: utf-8
from copy import copy
from collections import OrderedDict
from os.path import join, basename
from ast import literal_eval
import sys
import imp
from .i18n import _
from .utils import normalize_family
from .config import VIRTBASE, VIRTROOT, VIRTMASTER, templatedir
from .error import CreoleDictConsistencyError
from .xmlreflector import HIGH_COMPATIBILITY
#mode order is important
modes_level = ('basic', 'normal', 'expert')
class Mode(object):
def __init__(self, name, level):
self.name = name
self.level = level
def __cmp__(self, other):
return cmp(self.level, other.level)
def __eq__(self, other):
return self.level == other.level
def __ne__(self, other):
return self.level != other.level
def __gt__(self, other):
return other.level < self.level
def __ge__(self, other):
return not self.level < other.level
def __le__(self, other):
return not other.level < self.level
def mode_factory():
mode_obj = {}
for idx in range(len(modes_level)):
name = modes_level[idx]
mode_obj[name] = Mode(name, idx)
return mode_obj
modes = mode_factory()
# a CreoleObjSpace's attribute has some annotations
# that shall not be present in the exported (flatened) XML
ERASED_ATTRIBUTES = ('redefine', 'exists', 'fallback', 'optional', 'remove_check', 'namespace',
'remove_condition', 'path', 'instance_mode', 'index', 'is_in_leadership') # , '_real_container')
ERASED_CONTAINER_ATTRIBUTES = ('id', 'container', 'group_id', 'group', 'container_group')
NOT_NEED_ACTIVATE = ('disknod',)
FORCE_CHOICE = {'oui/non': ['oui', 'non'],
'on/off': ['on', 'off'],
'yes/no': ['yes', 'no'],
'schedule': ['none', 'daily', 'weekly', 'monthly'],
'schedulemod': ['pre', 'post']}
KEY_TYPE = {'SymLinkOption': 'symlink',
'PortOption': 'port',
'UnicodeOption': 'string',
'NetworkOption': 'network',
'NetmaskOption': 'netmask',
'URLOption': 'web_address',
'FilenameOption': 'filename'}
TYPE_PARAM_CHECK = ('string', 'python', 'eole')
TYPE_PARAM_CONDITION = ('string', 'python', 'number', 'eole')
TYPE_PARAM_FILL = ('string', 'eole', 'number', 'context')
DISKNOD_KEY_TYPE = {'major': 'number',
'minor': 'number'}
ERASED_FAMILY_ACTION_ATTRIBUTES = ('index', 'action')
FREEZE_AUTOFREEZE_VARIABLE = 'module_instancie'
class ContainerAnnotator:
"""Manage container's object
"""
def __init__(self, objectspace):
self.space = objectspace.space
self.paths = objectspace.paths
self.objectspace = objectspace
"""for example::
<service_access service='ntp'>
<port protocol='udp' service_accesslist='ntp_udp'>123</port>
<tcpwrapper>ntpd</tcpwrapper>
</service_access>
"""
self.grouplist_conditions = {}
self.convert_containers()
def convert_containers(self):
if hasattr(self.space, 'containers'):
if hasattr(self.space.containers, 'container'):
self.convert_all()
subelts = dict()
# self.space.containers.containers = self.objectspace.containers()
for idx, container in enumerate(self.space.containers.container.values()):
family = self.objectspace.family()
family.name = 'container{}'.format(idx)
family.doc = container.name
family.family = OrderedDict()
self.convert_container_to_family(family.family, container)
setattr(self.space.containers, family.name, family)
del self.space.containers.container
else:
del self.space.containers
def convert_all(self):
if hasattr(self.space.containers, 'all'):
# Remove "all" and dispatch informations in all containers
for type_, containers in vars(self.space.containers.all).items():
if type_ == 'index':
continue
if isinstance(containers, list):
for elt in containers:
for container in self.space.containers.container.values():
if container.name != 'all':
if not hasattr(container, type_):
setattr(container, type_, [])
new_elt = copy(elt)
new_elt.container = container
getattr(container, type_).append(new_elt)
else:
for name, elt in containers.items():
for container in self.space.containers.container.values():
if container.name != 'all':
if not hasattr(container, type_):
setattr(container, type_, OrderedDict())
old_container = getattr(container, type_)
if name in old_container:
raise CreoleDictConsistencyError('{}'.format(name))
new_elt = copy(elt)
new_elt.container = container
old_container[name] = new_elt
del self.space.containers.all
def convert_container_to_family(self, container_family, container):
# tranform container object to family object
# add services, service_accesses, ...
for elttype in self.objectspace.container_elt_attr_list:
if hasattr(container, elttype):
family = self.objectspace.family()
key_type_name = elttype.upper() + '_KEY_TYPE'
if key_type_name in globals():
key_type = globals()[key_type_name]
else:
key_type = {}
if elttype.endswith('s'):
family.name = elttype + 'es'
else:
family.name = elttype + 's'
values = getattr(container, elttype)
if isinstance(values, dict):
values = list(values.values())
family.family = self.make_group_from_elts(elttype,
values,
key_type,
'containers.{}'.format(family.name),
True)
family.mode = None
container_family[family.name] = family
def _generate_element(self, eltname, name, value, type_, subpath, multi=False):
var_data = {'name': name, 'doc': '', 'value': value,
'auto_freeze': False, 'mode': None, 'multi': multi}
values = None
if type_ == 'string':
values = self.objectspace.forced_choice_option.get(eltname, {}).get(name)
if values is not None:
type_ = 'choice'
var_data['type'] = type_
variable = self.objectspace.variable()
if not HIGH_COMPATIBILITY:
variable.mandatory = True
for key, value in var_data.items():
if key == 'value':
if value is None:
continue
if type_ == 'symlink':
key = 'opt'
else:
# Value is a list of objects
if not multi:
val = self.objectspace.value()
val.name = value
value = [val]
else:
value_list = []
for valiter in value:
val = self.objectspace.value()
val.name = valiter.name
value_list.append(val)
value = value_list
if key == 'doc' and type_ == 'symlink':
continue
setattr(variable, key, value)
if values is not None:
choices = []
for value in values:
choice = self.objectspace.choice()
choice.name = value
choices.append(choice)
variable.choice = choices
path = '{}.{}'.format(subpath, name)
self.paths.append('variable', path, 'containers', 'containers', variable)
return variable
def _make_disknod_auto(self, type_, index, variable):
if not hasattr(self.space.constraints, 'auto'):
self.space.constraints.auto = []
auto = self.objectspace.auto()
self.objectspace.index += 1
auto.index = self.objectspace.index
auto.namespace = 'containers'
param1 = self.objectspace.param()
param1.text = type_
param2 = self.objectspace.param()
param2.text = variable.name
auto.param = [param1, param2]
auto.name = 'cdrom_minormajor'
family = 'disknod{}'.format(index)
auto.target = 'containers.disknods.{}.{}'.format(family, type_)
if not hasattr(self.space, 'constraints'):
self.space.constraints = self.objectspace.constraints()
self.space.constraints.auto.append(auto)
def _make_disknod_type(self, index, variable):
auto = self.objectspace.auto()
self.objectspace.index += 1
auto.index = self.objectspace.index
auto.namespace = 'containers'
param = self.objectspace.param()
param.text = variable.name
auto.param = [param]
auto.name = 'device_type'
family = 'disknod{}'.format(index)
auto.target = 'containers.disknods.{}.type'.format(family)
if not hasattr(self.space, 'constraints'):
self.space.constraints = self.objectspace.constraints()
if not hasattr(self.space.constraints, 'auto'):
self.space.constraints.auto = []
self.space.constraints.auto.append(auto)
def _update_disknod(self, disknod, index):
disknod.major = None
disknod.minor = None
disknod.type = None
self._make_disknod_auto('minor', index, disknod)
self._make_disknod_auto('major', index, disknod)
self._make_disknod_type(index, disknod)
disknod.mode = u'rwm'
disknod.permission = 'allow'
def _update_file(self, file_, index):
if not hasattr(file_, 'source'):
file_.source = basename(file_.name)
def _split_elts(self, name, key, value, elt):
"""for example::
<service_access service='ntp'>
<port protocol='udp' service_accesslist='ntp_udp'>123</port>
<tcpwrapper>ntpd</tcpwrapper>
</service_access>
builds a `service_access` object, but we need **two** objects `service_access`,
for example one for the port and one for the tcpwrapper
"""
for subelt in value:
new_elt = copy(elt)
for subsubelt in dir(subelt):
if subsubelt.startswith('_') or subsubelt == 'index':
continue
if hasattr(new_elt, subsubelt):
if hasattr(elt, 'name'):
name_ = elt.name
else:
name_ = elt.service
raise CreoleDictConsistencyError(_('attribute {} already exists '
'for {}').format(subsubelt,
name_))
setattr(new_elt, subsubelt, getattr(subelt, subsubelt))
if hasattr(new_elt, 'node_name') or hasattr(new_elt, 'name_type'):
raise CreoleDictConsistencyError(_('attribute node_name or name_type '
'already exists for {}'
'').format(name))
if hasattr(subelt, key + '_type'):
type_ = getattr(subelt, key + '_type')
setattr(new_elt, 'name_type', type_)
setattr(new_elt, 'node_name', key)
if not hasattr(new_elt, name + 'list'):
setattr(new_elt, name + 'list', '___auto_{}'.format(elt.service))
else:
self.grouplist_conditions[new_elt] = '___auto_{}'.format(elt.service)
yield new_elt
def _reorder_elts(self, name, elts, duplicate_list):
"""Reorders by index the elts (the interface,
the hosts, actions...)
"""
dict_elts = OrderedDict()
# reorder elts by index
new_elts = {}
not_indexed = []
for elt in elts:
if not hasattr(elt, 'index'):
not_indexed.append(elt)
else:
idx = elt.index
new_elts.setdefault(idx, []).append(elt)
idxes = list(new_elts.keys())
idxes.sort()
elts = not_indexed
for idx in idxes:
elts.extend(new_elts[idx])
for idx, elt in enumerate(elts):
elt_added = False
for key in dir(elt):
if key.startswith('_') or key.endswith('_type') or key in ERASED_ATTRIBUTES:
continue
value = getattr(elt, key)
if isinstance(value, list) and duplicate_list:
for new_elt in self._split_elts(name, key, value, elt):
dict_elts.setdefault(new_elt.name, []).append({'elt_name': key,
'elt': new_elt})
elt_added = True
if not elt_added:
if hasattr(elt, 'name'):
eltname = elt.name
else:
eltname = idx
dict_elts.setdefault(eltname, []).append({'elt_name': name, 'elt': elt})
result_elts = []
for elt in dict_elts.values():
result_elts.extend(elt)
return result_elts
def make_group_from_elts(self, name, elts, key_type, path, duplicate_list):
"""Splits each objects into a group (and `OptionDescription`, in tiramisu terms)
and build elements and its attributes (the `Options` in tiramisu terms)
"""
families = []
new_elts = self._reorder_elts(name, elts, duplicate_list)
for index, elt_info in enumerate(new_elts):
elt = elt_info['elt']
elt_name = elt_info['elt_name']
# try to launch _update_xxxx() function
update_elt = '_update_' + elt_name
if hasattr(self, update_elt):
getattr(self, update_elt)(elt, index)
variables = []
subpath = '{}.{}{}'.format(path, name, index)
listname = '{}list'.format(name)
activate_path = '.'.join([subpath, 'activate'])
if name not in NOT_NEED_ACTIVATE and elt in self.grouplist_conditions:
# FIXME transformer le activate qui disparait en boolean
self.objectspace.list_conditions.setdefault(listname,
{}).setdefault(self.grouplist_conditions[elt],
[]).append(activate_path)
for key in dir(elt):
if key.startswith('_') or key.endswith('_type') or key in ERASED_ATTRIBUTES:
continue
value = getattr(elt, key)
if isinstance(value, list) and duplicate_list:
continue
if key == 'container':
value = value.name
if name not in NOT_NEED_ACTIVATE and key == listname:
self.objectspace.list_conditions.setdefault(listname,
{}).setdefault(
value,
[]).append(activate_path)
default_type = 'string'
if key in self.objectspace.booleans_attributs:
default_type = 'boolean'
type_ = key_type.get(key, default_type)
dtd_key_type = key + '_type'
if hasattr(elt, dtd_key_type):
type_ = KEY_TYPE[getattr(elt, dtd_key_type)]
multi = isinstance(value, list)
variables.append(self._generate_element(elt_name,
key,
value,
type_,
subpath,
multi))
if name not in NOT_NEED_ACTIVATE:
# FIXME ne devrait pas etre True par défaut
variables.append(self._generate_element(name, 'activate', True, 'boolean', subpath))
family = self.objectspace.family()
family.name = '{}{}'.format(name, index)
family.variable = variables
family.mode = None
self.paths.append('family', subpath, 'containers', creoleobj=family)
families.append(family)
return families
class ActionAnnotator(ContainerAnnotator):
def __init__(self, objectspace):
self.space = objectspace.space
self.paths = objectspace.paths
self.objectspace = objectspace
self.grouplist_conditions = {}
self.convert_family_action()
def convert_family_action(self):
if hasattr(self.space, 'family_action'):
actions = self.objectspace.family()
actions.name = 'actions'
actions.mode = None
actions.family = []
self.space.actions = actions
namespaces = []
for name, actions in self.space.family_action.items():
subpath = 'actions.{}'.format(normalize_family(name))
for action in actions.action:
namespace = action.namespace
if namespace in namespaces:
raise CreoleDictConsistencyError(_('only one action allow for {}'
'').format(namespace))
namespaces.append(namespace)
action.name = action.namespace
new_actions = self.make_group_from_elts('action', actions.action, {}, subpath, False)
family = self.objectspace.family()
family.name = actions.name
family.family = new_actions
family.mode = None
variables = []
for key, value in vars(actions).items():
if key not in ERASED_FAMILY_ACTION_ATTRIBUTES:
variables.append(self._generate_element('action', key, value, 'string',
subpath))
family.variable = variables
self.space.actions.family.append(family)
del self.space.family_action
class SpaceAnnotator(object):
"""Transformations applied on a CreoleObjSpace instance
"""
def __init__(self, objectspace, eosfunc_file):
self.paths = objectspace.paths
self.space = objectspace.space
self.objectspace = objectspace
self.valid_enums = {}
self.force_value = {}
self.has_calc = []
self.force_no_value = []
self.force_not_mandatory = []
if eosfunc_file is not None:
self.eosfunc = imp.load_source('eosfunc', eosfunc_file)
else:
self.eosfunc = None
if HIGH_COMPATIBILITY:
self.default_has_no_value = []
self.has_frozen_if_in_condition = []
self.default_variable_options()
self.convert_auto_freeze()
self.convert_groups()
self.filter_check()
self.filter_condition()
self.convert_valid_enums()
self.convert_autofill()
self.remove_empty_families()
self.change_variable_mode()
self.change_family_mode()
self.filter_separators()
self.absolute_path_for_symlink_in_containers()
self.convert_helps()
def absolute_path_for_symlink_in_containers(self):
if not hasattr(self.space, 'containers') or not hasattr(self.space.containers, 'family'):
return
families = self.space.containers.family.values()
for family in families:
if hasattr(family, 'family'):
for fam in family.family:
for variable in fam.variable:
if variable.type == 'symlink' and '.' not in variable.name:
variable.opt = self.paths.get_variable_path(variable.opt, 'creole')
def convert_helps(self):
# FIXME l'aide doit etre dans la variable!
if not hasattr(self.space, 'help'):
return
helps = self.space.help
if hasattr(helps, 'variable'):
for hlp in helps.variable.values():
variable = self.paths.get_variable_obj(hlp.name)
variable.help = hlp.text
if hasattr(helps, 'family'):
for hlp in helps.family.values():
variable = self.paths.get_family_obj(hlp.name)
variable.help = hlp.text
del self.space.help
def convert_groups(self): # pylint: disable=C0111
if hasattr(self.space, 'constraints'):
if hasattr(self.space.constraints, 'group'):
for group in self.space.constraints.group:
leader_fullname = group.master
follower_names = list(group.slave.keys())
leader_family_name = self.paths.get_variable_family_name(leader_fullname)
namespace = self.paths.get_variable_namespace(leader_fullname)
leader_name = self.paths.get_variable_name(leader_fullname)
leader_family = self.space.variables[namespace].family[leader_family_name]
leader_path = namespace + '.' + leader_family_name
is_leader = False
for variable in list(leader_family.variable.values()):
if isinstance(variable, self.objectspace.Leadership):
# append follower to an existed leadership
if variable.name == leader_name:
leader_space = variable
is_leader = True
else:
if is_leader:
if variable.name == follower_names[0]:
# followers are multi
if not variable.multi is True:
raise CreoleDictConsistencyError(_('the variable {} in a group must be multi').format(variable.name))
follower_names.remove(variable.name)
leader_family.variable.pop(variable.name)
leader_space.variable.append(variable) # pylint: disable=E1101
if namespace == 'creole':
variable_fullpath = variable.name
else:
variable_fullpath = leader_path + '.' + variable.name
self.paths.set_leader(variable_fullpath, leader_name)
if follower_names == []:
break
else:
raise CreoleDictConsistencyError(_('cannot found this follower {}').format(follower_names[0]))
if is_leader is False and variable.name == leader_name:
leader_space = self.objectspace.Leadership()
leader_space.variable = []
leader_space.name = leader_name
# manage leader's variable
if variable.multi is not True:
raise CreoleDictConsistencyError(_('the variable {} in a group must be multi').format(variable.name))
leader_family.variable[leader_name] = leader_space
leader_space.variable.append(variable) # pylint: disable=E1101
self.paths.set_leader(leader_fullname, leader_name)
leader_space.path = leader_fullname
is_leader = True
else:
raise CreoleDictConsistencyError(_('cannot found followers {}').format(follower_names))
del self.space.constraints.group
def remove_empty_families(self): # pylint: disable=C0111,R0201
if hasattr(self.space, 'variables'):
for family in self.space.variables.values():
if hasattr(family, 'family'):
space = family.family
removed_families = []
for family_name, family in space.items():
if not hasattr(family, 'variable') or len(family.variable) == 0:
removed_families.append(family_name)
del space[family_name]
# remove help too
if hasattr(self.space, 'help') and hasattr(self.space.help, 'family'):
for family in self.space.help.family.keys():
if family in removed_families:
del self.space.help.family[family]
def change_family_mode(self): # pylint: disable=C0111
if not hasattr(self.space, 'variables'):
return
for family in self.space.variables.values():
if hasattr(family, 'family'):
for family in family.family.values():
mode = modes_level[-1]
for variable in family.variable.values():
if isinstance(variable, self.objectspace.Leadership):
variable_mode = variable.variable[0].mode
variable.mode = variable_mode
else:
variable_mode = variable.mode
if variable_mode is not None and modes[mode] > modes[variable_mode]:
mode = variable_mode
if family.name == 'Containers':
family.mode = 'normal'
else:
family.mode = mode
def _annotate_variable(self, variable, family_mode, path, is_follower=False):
if (HIGH_COMPATIBILITY and variable.type == 'choice' and variable.mode != modes_level[-1] and variable.mandatory is True and path in self.default_has_no_value):
variable.mode = modes_level[0]
if variable.type == 'choice' and is_follower and family_mode == modes_level[0] and variable.mandatory is True:
variable.mode = modes_level[0]
# if the variable is mandatory and doesn't have any value
# then the variable's mode is set to 'basic'
has_value = hasattr(variable, 'value')
if (path not in self.has_calc and variable.mandatory is True and
(not has_value or is_follower) and variable.type != 'choice'):
variable.mode = modes_level[0]
if has_value:
if not HIGH_COMPATIBILITY or (not path.startswith('creole.containers.') \
and path not in self.force_no_value and path not in self.force_not_mandatory):
variable.mandatory = True
if variable.hidden is True:
variable.frozen = True
if not variable.auto_save is True and 'force_default_on_freeze' not in vars(variable):
variable.force_default_on_freeze = True
if variable.name == 'frozen' and not variable.auto_save is True:
variable.force_default_on_freeze = True
if variable.mode != None and not is_follower and modes[variable.mode] < modes[family_mode]:
variable.mode = family_mode
if variable.mode != None and variable.mode != modes_level[0] and modes[variable.mode] < modes[family_mode]:
variable.mode = family_mode
if variable.name == "available_probes":
variable.force_default_on_freeze = False
def default_variable_options(self):
if hasattr(self.space, 'variables'):
for families in self.space.variables.values():
if hasattr(families, 'family'):
for family in families.family.values():
if hasattr(family, 'variable'):
for variable in family.variable.values():
if not hasattr(variable, 'type'):
variable.type = 'string'
if variable.type != 'symlink' and not hasattr(variable, 'description'):
variable.description = variable.name
def convert_auto_freeze(self): # pylint: disable=C0111
if hasattr(self.space, 'variables'):
for variables in self.space.variables.values():
if hasattr(variables, 'family'):
for family in variables.family.values():
if hasattr(family, 'variable'):
for variable in family.variable.values():
if variable.auto_freeze:
new_condition = self.objectspace.condition()
new_condition.name = 'auto_frozen_if_in'
new_condition.namespace = variables.name
new_condition.source = FREEZE_AUTOFREEZE_VARIABLE
new_param = self.objectspace.param()
new_param.text = 'oui'
new_condition.param = [new_param]
new_target = self.objectspace.target()
new_target.type = 'variable'
if variables.name == 'creole':
path = variable.name
else:
path = variable.namespace + '.' + family.name + '.' + variable.name
new_target.name = path
new_condition.target = [new_target]
if not hasattr(self.space.constraints, 'condition'):
self.space.constraints.condition = []
self.space.constraints.condition.append(new_condition)
def _set_valid_enum(self, variable, values, type_):
if isinstance(values, list):
variable.mandatory = True
variable.choice = []
choices = []
for value in values:
choice = self.objectspace.choice()
choice.name = str(value)
choices.append(choice.name)
choice.type = type_
variable.choice.append(choice)
if not variable.choice:
raise CreoleDictConsistencyError(_('empty valid enum is not allowed for variable {}').format(variable.name))
if hasattr(variable, 'value'):
for value in variable.value:
value.type = type_
if value.name not in choices:
raise CreoleDictConsistencyError(_('value "{}" of variable "{}" is not in list of all expected values ({})').format(value.name, variable.name, choices))
else:
new_value = self.objectspace.value()
new_value.name = values[0]
new_value.type = type_
variable.value = [new_value]
else:
# probe choice
variable.choice = values
variable.type = 'choice'
def _convert_valid_enum(self, variable, path):
if variable.type in FORCE_CHOICE:
if path in self.valid_enums:
raise CreoleDictConsistencyError(_('cannot set valid enum for variable with type {}').format(variable.type))
self._set_valid_enum(variable, FORCE_CHOICE[variable.type], 'string')
if path in self.valid_enums:
values = self.valid_enums[path]['values']
self._set_valid_enum(variable, values, variable.type)
del self.valid_enums[path]
if path in self.force_value:
new_value = self.objectspace.value()
new_value.name = self.force_value[path]
variable.value = [new_value]
del self.force_value[path]
def convert_valid_enums(self): # pylint: disable=C0111
if hasattr(self.space, 'variables'):
for variables in self.space.variables.values():
namespace = variables.name
if hasattr(variables, 'family'):
for family in variables.family.values():
if hasattr(family, 'variable'):
for variable in family.variable.values():
if isinstance(variable, self.objectspace.Leadership):
for follower in variable.variable:
path = '{}.{}.{}.{}'.format(namespace, family.name, variable.name, follower.name)
self._convert_valid_enum(follower, path)
else:
path = '{}.{}.{}'.format(namespace, family.name, variable.name)
self._convert_valid_enum(variable, path)
# valid_enums must be empty now (all information are store in objects)
if self.valid_enums:
raise CreoleDictConsistencyError(_('valid_enum sets for unknown variables {}').format(self.valid_enums.keys()))
def change_variable_mode(self): # pylint: disable=C0111
if not hasattr(self.space, 'variables'):
return
for variables in self.space.variables.values():
if hasattr(variables, 'family'):
for family in variables.family.values():
family_mode = family.mode
if hasattr(family, 'variable'):
for variable in family.variable.values():
if isinstance(variable, self.objectspace.Leadership):
mode = modes_level[-1]
for follower in variable.variable:
if follower.auto_save is True:
raise CreoleDictConsistencyError(_('leader/followers {} '
'could not be '
'auto_save').format(follower.name))
if follower.auto_freeze is True:
raise CreoleDictConsistencyError(_('leader/followers {} '
'could not be '
'auto_freeze').format(follower.name))
if HIGH_COMPATIBILITY and variable.name != follower.name: # and variable.variable[0].mode != modes_level[0]:
is_follower = True
else:
is_follower = False
path = '{}.{}.{}.{}'.format(variables.name, family.name, variable.name, follower.name)
self._annotate_variable(follower, family_mode, path, is_follower)
if HIGH_COMPATIBILITY:
# leader's variable are right
if modes[variable.variable[0].mode] > modes[follower.mode]:
follower.mode = variable.variable[0].mode
else:
# auto_save's variable is set in 'basic' mode if its mode is 'normal'
if follower.auto_save is True and follower.mode != modes_level[-1]:
follower.mode = modes_level[0]
if modes[mode] > modes[follower.mode]:
mode = follower.mode
if not HIGH_COMPATIBILITY:
# the leader's mode is the lowest
variable.variable[0].mode = mode
variable.mode = variable.variable[0].mode
else:
# auto_save's variable is set in 'basic' mode if its mode is 'normal'
if variable.auto_save is True and variable.mode != modes_level[-1]:
variable.mode = modes_level[0]
# auto_freeze's variable is set in 'basic' mode if its mode is 'normal'
if variable.auto_freeze is True and variable.mode != modes_level[-1]:
variable.mode = modes_level[0]
path = '{}.{}.{}'.format(variables.name, family.name, variable.name)
self._annotate_variable(variable, family_mode, path)
def get_variable(self, name): # pylint: disable=C0111
return self.paths.get_variable_obj(name)
def convert_autofill(self): # pylint: disable=C0111
if hasattr(self.space, 'constraints'):
self.convert_duplicate_autofill(self.space.constraints)
if 'auto' in vars(self.space.constraints):
self.convert_auto(self.space.constraints.auto, self.space)
if 'fill' in vars(self.space.constraints):
self.convert_fill(self.space.constraints.fill, self.space)
def convert_duplicate_autofill(self, constraints):
""" Remove duplicate auto or fill for a variable
This variable must be redefined
"""
fills = {}
# sort fill/auto by index
if 'fill' in vars(constraints):
for idx, fill in enumerate(constraints.fill):
if fill.index in fills:
raise Exception('hu?')
fills[fill.index] = {'idx': idx, 'fill': fill, 'type': 'fill'}
if 'auto' in vars(constraints):
for idx, fill in enumerate(constraints.auto):
if fill.index in fills:
raise Exception('hu?')
fills[fill.index] = {'idx': idx, 'fill': fill, 'type': 'auto'}
indexes = list(fills.keys())
indexes.sort()
targets = {}
remove_autos = []
remove_fills = []
for idx in indexes:
fill = fills[idx]['fill']
if hasattr(fill, 'redefine'):
redefine = bool(fill.redefine)
else:
redefine = False
if fill.target in targets:
if redefine:
if targets[fill.target][1] == 'auto':
remove_autos.append(targets[fill.target][0])
else:
remove_fills.append(targets[fill.target][0])
else:
raise CreoleDictConsistencyError(_("An auto or fill already exists "
"for the target: {}").format(
fill.target))
targets[fill.target] = (fills[idx]['idx'], fills[idx]['type'])
remove_autos.sort(reverse=True)
for idx in remove_autos:
constraints.auto.pop(idx)
remove_fills.sort(reverse=True)
for idx in remove_fills:
constraints.fill.pop(idx)
def convert_auto(self, auto_space, space): # pylint: disable=C0111
for auto in auto_space:
if HIGH_COMPATIBILITY and auto.target in self.has_frozen_if_in_condition:
# if a variable has a 'frozen_if_in' condition
# then we change the 'auto' variable as a 'fill' variable
continue
# an auto is a fill with "hidden" and "frozen" properties
variable = self.get_variable(auto.target)
if variable.auto_freeze:
raise CreoleDictConsistencyError(_('variable with auto value '
'cannot be auto_freeze').format(auto.target))
if variable.auto_save:
raise CreoleDictConsistencyError(_('variable with auto value '
'cannot be auto_save').format(auto.target))
variable.hidden = True
variable.frozen = True
variable.force_default_on_freeze = True
if 'fill' not in vars(space.constraints):
space.constraints.fill = []
space.constraints.fill.extend(auto_space)
del space.constraints.auto
def filter_separators(self): # pylint: disable=C0111,R0201
# FIXME devrait etre dans la variable
if not hasattr(self.space, 'variables'):
return
for family in self.space.variables.values():
if (hasattr(family, 'separators') and hasattr(family.separators, 'separator')):
space = family.separators.separator
names = []
for idx, separator in enumerate(space):
namespace = self.paths.get_variable_namespace(separator.name)
subpath = self.paths.get_variable_path(separator.name, namespace)
separator.name = subpath
if separator.name in names:
raise CreoleDictConsistencyError(_('{} already has a separator').format(separator.name))
names.append(separator.name)
def load_params_in_validenum(self, param, probe):
if not probe and param.type in ['string', 'python', 'number']:
if not hasattr(param, 'text') and (param.type == 'python' or param.type == 'number'):
raise CreoleDictConsistencyError(_("All '{}' variables shall be set in order to calculate {}").format(param.type, 'valid_enum'))
if param.type in ['string', 'number']:
try:
values = literal_eval(param.text)
except ValueError:
raise CreoleDictConsistencyError(_('Cannot load {}').format(param.text))
elif param.type == 'python':
try:
values = eval(param.text, {'eosfunc': self.eosfunc, '__builtins__': {'range': range, 'str': str}})
#FIXME : eval('[str(i) for i in range(3, 13)]', {'eosfunc': eosfunc, '__builtins__': {'range': range, 'str': str}})
except NameError:
raise CreoleDictConsistencyError(_('The function {} is unknown').format(param.text))
if not isinstance(values, list):
raise CreoleDictConsistencyError(_('Function {} shall return a list').format(param.text))
new_values = []
for val in values:
if sys.version_info[0] < 3 and isinstance(val, str):
val = val.decode('utf-8')
new_values.append(val)
values = new_values
else:
values = param.text
return values
def filter_check(self): # pylint: disable=C0111
# valid param in check
if not hasattr(self.space, 'constraints') or not hasattr(self.space.constraints, 'check'):
return
space = self.space.constraints.check
remove_indexes = []
for check_idx, check in enumerate(space):
namespace = check.namespace
if hasattr(check, 'param'):
param_option_indexes = []
for idx, param in enumerate(check.param):
if param.type not in TYPE_PARAM_CHECK:
raise CreoleDictConsistencyError(_('cannot use {} type as a param in check for {}').format(param.type, check.target))
if param.type == 'eole':
if HIGH_COMPATIBILITY and param.text.startswith('container_ip'):
if param.optional is True:
param_option_indexes.append(idx)
try:
param.text = self.paths.get_variable_path(param.text, namespace)
except CreoleDictConsistencyError as err:
if param.optional is True:
param_option_indexes.append(idx)
else:
raise err
param_option_indexes = list(set(param_option_indexes))
param_option_indexes.sort(reverse=True)
for idx in param_option_indexes:
check.param.pop(idx)
if not HIGH_COMPATIBILITY and check.param == []:
remove_indexes.append(check_idx)
remove_indexes.sort(reverse=True)
for idx in remove_indexes:
del space[idx]
variables = {}
for index, check in enumerate(space):
namespace = check.namespace
if HIGH_COMPATIBILITY:
if not self.paths.path_is_defined(check.target):
continue
check.is_in_leadership = self.paths.get_leader(check.target) != None
# let's replace the target by the path
check.target = self.paths.get_variable_path(check.target, namespace)
if check.target not in variables:
variables[check.target] = []
variables[check.target].append((index, check))
# remove check already set for a variable
remove_indexes = []
for checks in variables.values():
names = {}
for idx, check in checks:
if HIGH_COMPATIBILITY and check.name == 'valid_enum':
redefine = True
else:
redefine = False
#redefine = bool(check.redefine)
if redefine and check.name in names:
remove_indexes.append(names[check.name])
del names[check.name]
names[check.name] = idx
del check.index
remove_indexes.sort(reverse=True)
for idx in remove_indexes:
del space[idx]
remove_indexes = []
for idx, check in enumerate(space):
if not check.name in dir(self.eosfunc):
raise CreoleDictConsistencyError(_('cannot find check function {}').format(check.name))
#is_probe = not check.name in self.eosfunc.func_on_zephir_context
#if is_probe:
# raise CreoleDictConsistencyError(_('cannot have a check with probe function ({})').format(check.name))
if check.name == 'valid_enum':
proposed_value_type = False
remove_params = []
for param_idx, param in enumerate(check.param):
if hasattr(param, 'name') and param.name == 'checkval':
try:
proposed_value_type = self.objectspace._convert_boolean(param.text) == False
remove_params.append(param_idx)
except TypeError as err:
raise CreoleDictConsistencyError(_('cannot load checkval value for variable {}: {}').format(check.target, err))
remove_params.sort(reverse=True)
for param_idx in remove_params:
del check.param[param_idx]
if len(check.param) != 1:
raise CreoleDictConsistencyError(_('cannot set more than one param '
'for valid_enum for variable {}'
'').format(check.target))
param = check.param[0]
if proposed_value_type:
if param.type != 'eole':
try:
values = self.load_params_in_validenum(param, check.probe)
except NameError as err:
raise CreoleDictConsistencyError(_('cannot load value for variable {}: {}').format(check.target, err))
add_value = True
if HIGH_COMPATIBILITY and check.is_in_leadership:
add_value = False
if add_value and values:
self.force_value[check.target] = values[0]
else:
if check.target in self.valid_enums:
raise CreoleDictConsistencyError(_('valid_enum already set for {}'
'').format(check.target))
values = self.load_params_in_validenum(param, check.probe)
self.valid_enums[check.target] = {'type': param.type,
'values': values}
remove_indexes.append(idx)
remove_indexes.sort(reverse=True)
for idx in remove_indexes:
del space[idx]
#convert level to "warnings_only" and hidden to "transitive"
for check in space:
if check.level == 'warning':
check.warnings_only = True
else:
check.warnings_only = False
check.level = None
transitive = True
if hasattr(check, 'param'):
for param in check.param:
if not param.hidden is True:
transitive = False
param.hidden = None
check.transitive = transitive
def convert_fill(self, fill_space, space): # pylint: disable=C0111,R0912
fills = {}
# sort fill/auto by index
for idx, fill in enumerate(fill_space):
fills[fill.index] = {'idx': idx, 'fill': fill}
del fill.index
indexes = list(fills.keys())
indexes.sort()
del_idx = []
for idx in indexes:
fill = fills[idx]['fill']
variable = self.get_variable(fill.target)
if hasattr(variable, 'value'):
del variable.value
namespace = fill.namespace
# let's replace the target by the path
fill.target = self.paths.get_variable_path(fill.target, namespace)
if not fill.name in dir(self.eosfunc):
raise CreoleDictConsistencyError(_('cannot find fill function {}').format(fill.name))
#is_probe = not fill.name in self.eosfunc.func_on_zephir_context
if hasattr(fill, 'param'):
for param in fill.param:
if param.type not in TYPE_PARAM_FILL:
raise CreoleDictConsistencyError(_('cannot use {} type as a param '
'in a fill/auto').format(param.type))
param_option_indexes = []
for fill_idx, param in enumerate(fill.param):
if not hasattr(param, 'text') and \
(param.type == 'eole' or param.type == 'number' or \
#param.type == 'container' or param.type == 'python'):
param.type == 'python'):
raise CreoleDictConsistencyError(_("All '{}' variables shall be set in "
"order to calculate {}").format(
param.type,
fill.target))
# if param.type == 'container':
# param.type = 'eole'
# param.text = 'container_ip_{}'.format(param.text)
if param.type == 'eole':
#if is_probe:
# raise CreoleDictConsistencyError(_('Function {0} used to calculate {1} '
# 'is executed on remote server, '
# 'so cannot depends to an '
# 'other variable'
# ).format(fill.name, fill.target))
if HIGH_COMPATIBILITY and param.text.startswith('container_ip'):
if param.optional is True:
param_option_indexes.append(fill_idx)
try:
param.text = self.paths.get_variable_path(param.text, namespace)
except CreoleDictConsistencyError as err:
if param.optional is True:
param_option_indexes.append(fill_idx)
else:
raise err
param_option_indexes = list(set(param_option_indexes))
param_option_indexes.sort(reverse=True)
for param_idx in param_option_indexes:
fill.param.pop(param_idx)
self.has_calc.append(fill.target)
#if is_probe:
# variable.force_default_on_freeze = False
# self.objectspace.probe_variables.append(fill)
# del_idx.append(fills[idx]['idx'])
del_idx.sort(reverse=True)
for idx in del_idx:
space.constraints.fill.pop(idx)
def filter_target(self, space, namespace): # pylint: disable=C0111
del_idx = []
for idx, target in enumerate(space.target):
if target.type == 'variable':
if (hasattr(target, 'optional') and target.optional is True and
not self.paths.path_is_defined(target.name)):
del_idx.append(idx)
continue
if space.source == target.name:
raise CreoleDictConsistencyError(_('target name and source name must be different: {}').format(space.source))
target.name = self.paths.get_variable_path(target.name, namespace)
elif target.type == 'family':
try:
target.name = self.paths.get_family_path(target.name, namespace)
except KeyError:
raise CreoleDictConsistencyError(_('cannot found family {}').format(target.name))
del_idx = list(set(del_idx))
del_idx.sort(reverse=True)
for idx in del_idx:
space.target.pop(idx)
def filter_condition(self): # pylint: disable=C0111
if not hasattr(self.space, 'constraints') or not hasattr(self.space.constraints, 'condition'):
return
space = self.space.constraints.condition
remove_conditions = []
fallback_variables = []
fallback_lists = []
# automatic generation of the service_access lists
# and the service_restriction lists from the servicelist
for condition in space:
if hasattr(condition, 'target'):
new_targets = []
for target in condition.target:
if target.type == 'servicelist':
new_target = copy(target)
new_target.type = 'service_accesslist'
new_target.name = '___auto_{}'.format(new_target.name)
new_targets.append(new_target)
new_target = copy(target)
new_target.type = 'service_restrictionlist'
new_target.name = '___auto_{}'.format(new_target.name)
new_targets.append(new_target)
condition.target.extend(new_targets)
# remove condition with target
if HIGH_COMPATIBILITY:
for idx, condition in enumerate(space):
if not hasattr(condition, 'target'):
remove_conditions.append(idx)
for idx, condition in enumerate(space):
if idx in remove_conditions:
continue
if condition.name == 'hidden_if_in':
condition.name = 'disabled_if_in'
elif condition.name == 'hidden_if_not_in':
condition.name = 'disabled_if_not_in'
# a conditon with a fallback **and** the source variable doesn't exist
if (hasattr(condition, 'fallback') and condition.fallback is True and
not self.paths.path_is_defined(condition.source)):
for target in condition.target:
if target.type in ['variable', 'family']:
name = target.name.split('.')[-1]
if target.type == 'variable':
variable = self.get_variable(name)
else:
variable = self.paths.get_family_obj(name)
if condition.name in ['disabled_if_in']:
variable.disabled = True
if condition.name in ['mandatory_if_in']:
variable.mandatory = True
if condition.name in ['disabled_if_in', 'disabled_if_not_in',
'frozen_if_in', 'frozen_if_not_in']:
variable.hidden = False
if HIGH_COMPATIBILITY:
fallback_variables.append(name)
else:
listname = target.type
if not listname.endswith('list'):
raise Exception('not yet implemented')
listvars = self.objectspace.list_conditions.get(listname,
{}).get(target.name)
if listvars:
for listvar in listvars:
try:
variable = self.get_variable(listvar)
except CreoleDictConsistencyError:
variable = self.paths.get_family_obj(listvar)
if condition.name in ['disabled_if_in']:
variable.disabled = True
if condition.name in ['mandatory_if_in']:
variable.mandatory = True
if condition.name in ['disabled_if_in', 'disabled_if_not_in',
'frozen_if_in', 'frozen_if_not_in']:
variable.hidden = False
fallback_lists.append(listvar)
remove_conditions.append(idx)
for condition_idx, condition in enumerate(space):
if condition_idx in remove_conditions:
continue
namespace = condition.namespace
self.filter_target(condition, namespace)
# transform *list to variable or family
for condition_idx, condition in enumerate(space):
if condition.name in ['disabled_if_in', 'disabled_if_not_in', 'frozen_if_in', 'auto_frozen_if_in',
'frozen_if_not_in', 'mandatory_if_in', 'mandatory_if_not_in']:
new_targets = []
remove_targets = []
if not hasattr(condition, 'target'):
continue
for target_idx, target in enumerate(condition.target):
if target.type not in ['variable', 'family']:
listname = target.type
if not listname.endswith('list'):
raise Exception('not yet implemented')
listvars = self.objectspace.list_conditions.get(listname,
{}).get(target.name)
if listvars:
for listvar in listvars:
if listvar in fallback_lists:
continue
try:
variable = self.get_variable(listvar)
type_ = 'variable'
except CreoleDictConsistencyError:
variable = self.paths.get_family_obj(listvar)
type_ = 'family'
new_target = self.objectspace.target()
new_target.type = type_
new_target.name = listvar
new_target.index = target.index
new_targets.append(new_target)
remove_targets.append(target_idx)
remove_targets = list(set(remove_targets))
remove_targets.sort(reverse=True)
for target_idx in remove_targets:
condition.target.pop(target_idx)
condition.target.extend(new_targets)
force_remove_targets = {}
for condition_idx, condition in enumerate(space):
if condition_idx in remove_conditions:
continue
namespace = condition.namespace
src_variable = self.paths.get_variable_obj(condition.source)
condition.source = self.paths.get_variable_path(condition.source, namespace, allow_source=True)
for param in condition.param:
if param.type not in TYPE_PARAM_CONDITION:
raise CreoleDictConsistencyError(_('cannot use {} type as a param '
'in a condition').format(param.type))
if condition.name in ['disabled_if_in', 'disabled_if_not_in', 'frozen_if_in', 'auto_frozen_if_in',
'frozen_if_not_in', 'mandatory_if_in', 'mandatory_if_not_in']:
valid_enum = None
# remove condition for ChoiceOption that don't have param
if condition.source in self.valid_enums and \
self.valid_enums[condition.source]['type'] == 'string':
valid_enum = self.valid_enums[condition.source]['values']
if src_variable.type in FORCE_CHOICE:
valid_enum = FORCE_CHOICE[src_variable.type]
if valid_enum is not None:
remove_param = []
for param_idx, param in enumerate(condition.param):
if param.text not in valid_enum:
remove_param.append(param_idx)
remove_param.sort(reverse=True)
for idx in remove_param:
del condition.param[idx]
if condition.param == []:
for target in condition.target:
if target.name.startswith('creole.'):
name = target.name.split('.')[-1]
else:
name = target.name
if target.type == 'variable':
variable = self.get_variable(name)
else:
variable = self.paths.get_family_obj(name)
if condition.name == 'disabled_if_not_in':
variable.disabled = True
force_remove_targets.setdefault(condition.name,
[]).append(target.name)
elif condition.name == 'frozen_if_not_in':
variable.hidden = True
force_remove_targets.setdefault(condition.name,
[]).append(target.name)
elif condition.name == 'mandatory_if_not_in':
variable.mandatory = True
force_remove_targets.setdefault(condition.name,
[]).append(target.name)
elif HIGH_COMPATIBILITY and condition.name == 'disabled_if_in':
variable.hidden = False
remove_conditions.append(condition_idx)
remove_conditions = list(set(remove_conditions))
remove_conditions.sort(reverse=True)
for idx in remove_conditions:
space.pop(idx)
for condition_idx, condition in enumerate(space):
if condition.name in ['disabled_if_in', 'disabled_if_not_in', 'frozen_if_in', 'auto_frozen_if_in',
'frozen_if_not_in', 'mandatory_if_in', 'mandatory_if_not_in']:
remove_targets = []
#parse each variable and family
for target_idx, target in enumerate(condition.target):
if target.name in force_remove_targets.get(condition.name, []):
remove_targets.append(target_idx)
if target.name.startswith('creole.'):
name = target.name.split('.')[-1]
else:
name = target.name
if target.type == 'variable':
variable = self.get_variable(name)
else:
variable = self.paths.get_family_obj(name)
if name in fallback_variables:
remove_targets.append(target_idx)
continue
if condition.name in ['disabled_if_in', 'disabled_if_not_in',
'frozen_if_in', 'frozen_if_not_in']:
variable.hidden = False
if condition.name in ['mandatory_if_in', 'mandatory_if_not_in']:
variable.mandatory = False
if HIGH_COMPATIBILITY and condition.name in ['frozen_if_in',
'frozen_if_not_in']:
self.has_frozen_if_in_condition.append(name)
if condition.name in ['mandatory_if_in', 'mandatory_if_not_in']:
self.force_not_mandatory.append(target.name)
remove_targets = list(set(remove_targets))
remove_targets.sort(reverse=True)
for target_idx in remove_targets:
condition.target.pop(target_idx)