# coding: utf-8 from copy import copy from typing import List from collections import OrderedDict from os.path import join, basename from ast import literal_eval import imp from .i18n import _ from .utils import normalize_family from .error import CreoleDictConsistencyError from .xmlreflector import HIGH_COMPATIBILITY #mode order is important modes_level = ('basic', 'normal', 'expert') class Mode(object): def __init__(self, name, level): self.name = name self.level = level def __cmp__(self, other): return cmp(self.level, other.level) def __eq__(self, other): return self.level == other.level def __ne__(self, other): return self.level != other.level def __gt__(self, other): return other.level < self.level def __ge__(self, other): return not self.level < other.level def __le__(self, other): return not other.level < self.level def mode_factory(): mode_obj = {} for idx in range(len(modes_level)): name = modes_level[idx] mode_obj[name] = Mode(name, idx) return mode_obj modes = mode_factory() # a CreoleObjSpace's attribute has some annotations # that shall not be present in the exported (flatened) XML ERASED_ATTRIBUTES = ('redefine', 'exists', 'fallback', 'optional', 'remove_check', 'namespace', 'remove_condition', 'path', 'instance_mode', 'index', 'is_in_leadership', 'level', 'submulti') # , '_real_container') ERASED_CONTAINER_ATTRIBUTES = ('id', 'container', 'group_id', 'group', 'container_group') FORCE_CHOICE = {'oui/non': ['oui', 'non'], 'on/off': ['on', 'off'], 'yes/no': ['yes', 'no'], 'schedule': ['none', 'daily', 'weekly', 'monthly'], 'schedulemod': ['pre', 'post']} KEY_TYPE = {'variable': 'symlink', 'SymLinkOption': 'symlink', 'PortOption': 'port', 'UnicodeOption': 'string', 'NetworkOption': 'network', 'NetmaskOption': 'netmask', 'URLOption': 'web_address', 'FilenameOption': 'filename'} TYPE_PARAM_CHECK = ('string', 'python', 'variable') TYPE_PARAM_CONDITION = ('string', 'python', 'number', 'variable') TYPE_PARAM_FILL = ('string', 'number', 'variable') CONVERSION = {'number': int} FREEZE_AUTOFREEZE_VARIABLE = 'module_instancie' VARIABLE_NAMESPACE = 'rougail' class ServiceAnnotator: """Manage service's object for example:: 123 """ def __init__(self, objectspace): self.space = objectspace.space self.paths = objectspace.paths self.objectspace = objectspace self.grouplist_conditions = {} self.convert_services() def gen_family(self, name, ): family = self.objectspace.family() family.name = name family.doc = name family.mode = None return family def convert_services(self): if not hasattr(self.space, 'services'): return if not hasattr(self.space.services, 'service'): del self.space.services return self.space.services.hidden = True families = {} for idx, service_name in enumerate(self.space.services.service.keys()): service = self.space.services.service[service_name] new_service = self.objectspace.service() for elttype, values in vars(service).items(): if elttype == 'name' or elttype in ERASED_ATTRIBUTES: setattr(new_service, elttype, values) continue eltname = elttype + 's' family = self.gen_family(eltname) if isinstance(values, dict): values = list(values.values()) family.family = self.make_group_from_elts(elttype, values, f'services.{service_name}.{eltname}', ) setattr(new_service, elttype, family) families[service_name] = new_service self.space.services.service = families def make_group_from_elts(self, name, elts, path, ): """Splits each objects into a group (and `OptionDescription`, in tiramisu terms) and build elements and its attributes (the `Options` in tiramisu terms) """ families = [] new_elts = self._reorder_elts(name, elts, ) for index, elt_info in enumerate(new_elts): elt = elt_info['elt'] elt_name = elt_info['elt_name'] # try to launch _update_xxxx() function update_elt = '_update_' + elt_name if hasattr(self, update_elt): getattr(self, update_elt)(elt, index, path) if hasattr(elt, 'source'): c_name = elt.source else: c_name = elt.name family = self.gen_family(c_name) family.variable = [] subpath = '{}.{}'.format(path, c_name) listname = '{}list'.format(name) activate_path = '.'.join([subpath, 'activate']) if elt in self.grouplist_conditions: # FIXME transformer le activate qui disparait en boolean self.objectspace.list_conditions.setdefault(listname, {}).setdefault(self.grouplist_conditions[elt], []).append(activate_path) for key in dir(elt): if key.startswith('_') or key.endswith('_type') or key in ERASED_ATTRIBUTES: continue value = getattr(elt, key) if key == listname: self.objectspace.list_conditions.setdefault(listname, {}).setdefault( value, []).append(activate_path) continue family.variable.append(self._generate_element(elt_name, key, value, elt, f'{subpath}.{key}' )) # FIXME ne devrait pas etre True par défaut # devrait etre un calcule family.variable.append(self._generate_element(elt_name, 'activate', True, elt, activate_path, )) families.append(family) return families def _generate_element(self, elt_name, key, value, elt, path, ): variable = self.objectspace.variable() variable.name = key variable.mode = None if key == 'name': true_key = elt_name else: true_key = key dtd_key_type = true_key + '_type' if key == 'activate': type_ = 'boolean' elif hasattr(elt, dtd_key_type): type_ = KEY_TYPE[getattr(elt, dtd_key_type)] elif key in self.objectspace.booleans_attributs: type_ = 'boolean' else: type_ = 'string' variable.type = type_ if type_ == 'symlink': variable.opt = value else: variable.doc = key val = self.objectspace.value() val.name = value variable.value = [val] self.paths.add_variable('services', path, 'service', False, variable, ) return variable def _reorder_elts(self, name, elts, ): """Reorders by index the elts """ new_elts = {} # reorder elts by index for idx, elt in enumerate(elts): new_elts.setdefault(idx, []).append(elt) idxes = list(new_elts.keys()) idxes.sort() result_elts = [] for idx in idxes: for elt in new_elts[idx]: result_elts.append({'elt_name': name, 'elt': elt}) return result_elts def _update_override(self, *args): self._update_file(*args) def _update_file(self, file_, index, service_path, ): if not hasattr(file_, 'file_type') or file_.file_type == "UnicodeOption": if not hasattr(file_, 'source'): file_.source = basename(file_.name) elif not hasattr(file_, 'source'): raise CreoleDictConsistencyError(_('attribute source mandatory for file with variable name ' 'for {}').format(file_.name)) class SpaceAnnotator(object): """Transformations applied on a CreoleObjSpace instance """ def __init__(self, objectspace, eosfunc_file): self.paths = objectspace.paths self.space = objectspace.space self.objectspace = objectspace self.valid_enums = {} self.force_value = {} self.force_not_mandatory = [] if eosfunc_file: self.eosfunc = imp.load_source('eosfunc', eosfunc_file) else: self.eosfunc = None if HIGH_COMPATIBILITY: self.has_hidden_if_in_condition = [] self.convert_variable() self.convert_auto_freeze() self.convert_groups() self.filter_check() self.filter_condition() self.convert_valid_enums() self.convert_check() self.convert_fill() self.remove_empty_families() self.change_variable_mode() self.change_family_mode() self.dynamic_families() self.filter_separators() self.absolute_path_for_symlink_in_services() self.convert_helps() if hasattr(self.space, 'constraints'): del self.space.constraints.index if vars(self.space.constraints): raise Exception('constraints again?') del self.space.constraints def absolute_path_for_symlink_in_services(self): if not hasattr(self.space, 'services'): return for family_name, family in vars(self.space.services).items(): if not isinstance(family, dict): continue for fam in family.values(): for fam1_name, fam1 in vars(fam).items(): if fam1_name == 'name' or fam1_name in ERASED_ATTRIBUTES: continue for fam2 in fam1.family: for variable in fam2.variable: if variable.type == 'symlink' and '.' not in variable.name: variable.opt = self.paths.get_variable_path(variable.opt, VARIABLE_NAMESPACE, ) def convert_helps(self): # FIXME l'aide doit etre dans la variable! if not hasattr(self.space, 'help'): return helps = self.space.help if hasattr(helps, 'variable'): for hlp in helps.variable.values(): variable = self.paths.get_variable_obj(hlp.name) variable.help = hlp.text if hasattr(helps, 'family'): for hlp in helps.family.values(): variable = self.paths.get_family_obj(hlp.name) variable.help = hlp.text del self.space.help def manage_leader(self, leader_space: 'Leadership', leader_family_name: str, leader_name: str, namespace: str, variable: 'Variable', group: 'Group', leader_fullname: str, ) -> None: # manage leader's variable if variable.multi is not True: raise CreoleDictConsistencyError(_('the variable {} in a group must be multi').format(variable.name)) leader_space.variable = [] leader_space.name = leader_name leader_space.hidden = variable.hidden if variable.hidden: leader_is_hidden = True variable.frozen = True variable.force_default_on_freeze = True else: leader_is_hidden = False variable.hidden = None if hasattr(group, 'description'): leader_space.doc = group.description else: leader_space.doc = variable.description leader_path = namespace + '.' + leader_family_name + '.' + leader_name self.paths.add_family(namespace, leader_path, leader_space, ) leader_family = self.space.variables[namespace].family[leader_family_name] leader_family.variable[leader_name] = leader_space leader_space.variable.append(variable) self.paths.set_leader(namespace, leader_family_name, leader_name, leader_name, ) leader_space.path = leader_fullname return leader_is_hidden def manage_follower(self, namespace: str, leader_family_name: str, variable: 'Variable', leader_name: str, follower_names: List[str], leader_space: 'Leadership', leader_is_hidden: bool, ) -> None: if variable.name != follower_names[0]: raise CreoleDictConsistencyError(_('cannot found this follower {}').format(follower_names[0])) follower_names.remove(variable.name) if leader_is_hidden: variable.frozen = True variable.force_default_on_freeze = True # followers are multi if not variable.multi: raise CreoleDictConsistencyError(_('the variable {} in a group must be multi or submulti').format(variable.name)) leader_space.variable.append(variable) # pylint: disable=E1101 self.paths.set_leader(namespace, leader_family_name, variable.name, leader_name, ) def convert_groups(self): # pylint: disable=C0111 if hasattr(self.space, 'constraints') and hasattr(self.space.constraints, 'group'): for group in self.space.constraints.group: leader_fullname = group.leader follower_names = list(group.follower.keys()) leader_family_name = self.paths.get_variable_family_name(leader_fullname) namespace = self.paths.get_variable_namespace(leader_fullname) leader_name = self.paths.get_variable_name(leader_fullname) ori_leader_family = self.space.variables[namespace].family[leader_family_name] has_a_leader = False for variable in list(ori_leader_family.variable.values()): if isinstance(variable, self.objectspace.Leadership): # append follower to an existed leadership if variable.name == leader_name: leader_space = variable has_a_leader = True else: if has_a_leader: # it's a follower self.manage_follower(namespace, leader_family_name, variable, leader_name, follower_names, leader_space, leader_is_hidden, ) ori_leader_family.variable.pop(variable.name) if follower_names == []: # no more follower break elif variable.name == leader_name: # it's a leader leader_space = self.objectspace.Leadership() leader_is_hidden = self.manage_leader(leader_space, leader_family_name, leader_name, namespace, variable, group, leader_fullname, ) has_a_leader = True else: raise CreoleDictConsistencyError(_('cannot found followers {}').format(follower_names)) del self.space.constraints.group def remove_empty_families(self): # pylint: disable=C0111,R0201 if hasattr(self.space, 'variables'): for family in self.space.variables.values(): if hasattr(family, 'family'): space = family.family removed_families = [] for family_name, family in space.items(): if not hasattr(family, 'variable') or len(family.variable) == 0: removed_families.append(family_name) del space[family_name] # remove help too if hasattr(self.space, 'help') and hasattr(self.space.help, 'family'): for family in self.space.help.family.keys(): if family in removed_families: del self.space.help.family[family] def change_family_mode(self): # pylint: disable=C0111 if not hasattr(self.space, 'variables'): return for family in self.space.variables.values(): if hasattr(family, 'family'): for family in family.family.values(): mode = modes_level[-1] for variable in family.variable.values(): if isinstance(variable, self.objectspace.Leadership): variable_mode = variable.variable[0].mode variable.variable[0].mode = None variable.mode = variable_mode else: variable_mode = variable.mode if variable_mode is not None and modes[mode] > modes[variable_mode]: mode = variable_mode family.mode = mode def dynamic_families(self): # pylint: disable=C0111 if not hasattr(self.space, 'variables'): return for family in self.space.variables.values(): if hasattr(family, 'family'): for family in family.family.values(): if 'dynamic' in vars(family): namespace = self.paths.get_variable_namespace(family.dynamic) varpath = self.paths.get_variable_path(family.dynamic, namespace) family.dynamic = varpath def annotate_variable(self, variable, family_mode, path, is_follower=False): # if the variable is mandatory and doesn't have any value # then the variable's mode is set to 'basic' has_value = hasattr(variable, 'value') if variable.mandatory is True and (not has_value or is_follower): variable.mode = modes_level[0] if variable.mode != None and modes[variable.mode] < modes[family_mode] and (not is_follower or variable.mode != modes_level[0]): variable.mode = family_mode if has_value and path not in self.force_not_mandatory: variable.mandatory = True if variable.hidden is True: variable.frozen = True if not variable.auto_save is True and 'force_default_on_freeze' not in vars(variable): variable.force_default_on_freeze = True def convert_variable(self): if not hasattr(self.space, 'variables'): return for families in self.space.variables.values(): if hasattr(families, 'family'): for family in families.family.values(): if hasattr(family, 'variable'): for variable in family.variable.values(): if not hasattr(variable, 'type'): variable.type = 'string' if variable.type != 'symlink' and not hasattr(variable, 'description'): variable.description = variable.name if variable.submulti: variable.multi = 'submulti' def convert_auto_freeze(self): # pylint: disable=C0111 if hasattr(self.space, 'variables'): for variables in self.space.variables.values(): if hasattr(variables, 'family'): for family in variables.family.values(): if hasattr(family, 'variable'): for variable in family.variable.values(): if variable.auto_freeze: new_condition = self.objectspace.condition() new_condition.name = 'auto_hidden_if_not_in' new_condition.namespace = variables.name new_condition.source = FREEZE_AUTOFREEZE_VARIABLE new_param = self.objectspace.param() new_param.text = 'oui' new_condition.param = [new_param] new_target = self.objectspace.target() new_target.type = 'variable' if variables.name == VARIABLE_NAMESPACE: path = variable.name else: path = variable.namespace + '.' + family.name + '.' + variable.name new_target.name = path new_condition.target = [new_target] if not hasattr(self.space.constraints, 'condition'): self.space.constraints.condition = [] self.space.constraints.condition.append(new_condition) def _set_valid_enum(self, variable, values, type_): variable.mandatory = True variable.choice = [] choices = [] for value in values: choice = self.objectspace.choice() try: if type_ in CONVERSION: choice.name = CONVERSION[type_](value) else: choice.name = str(value) except: raise CreoleDictConsistencyError(_(f'unable to change type of a valid_enum entry "{value}" is not a valid "{type_}" for "{variable.name}"')) choices.append(choice.name) choice.type = type_ variable.choice.append(choice) if not variable.choice: raise CreoleDictConsistencyError(_('empty valid enum is not allowed for variable {}').format(variable.name)) if hasattr(variable, 'value'): for value in variable.value: value.type = type_ if type_ in CONVERSION: cvalue = CONVERSION[type_](value.name) else: cvalue = value.name if cvalue not in choices: raise CreoleDictConsistencyError(_('value "{}" of variable "{}" is not in list of all expected values ({})').format(value.name, variable.name, choices)) else: new_value = self.objectspace.value() new_value.name = values[0] new_value.type = type_ variable.value = [new_value] variable.type = 'choice' def _convert_valid_enum(self, variable, path): if variable.type in FORCE_CHOICE: if path in self.valid_enums: raise CreoleDictConsistencyError(_('cannot set valid enum for variable with type {}').format(variable.type)) self._set_valid_enum(variable, FORCE_CHOICE[variable.type], 'string') if path in self.valid_enums: values = self.valid_enums[path]['values'] self._set_valid_enum(variable, values, variable.type) del self.valid_enums[path] if path in self.force_value: new_value = self.objectspace.value() new_value.name = self.force_value[path] variable.value = [new_value] del self.force_value[path] def convert_valid_enums(self): # pylint: disable=C0111 if not hasattr(self.space, 'variables'): return for variables in self.space.variables.values(): namespace = variables.name if hasattr(variables, 'family'): for family in variables.family.values(): if hasattr(family, 'variable'): for variable in family.variable.values(): if isinstance(variable, self.objectspace.Leadership): for follower in variable.variable: path = '{}.{}.{}.{}'.format(namespace, family.name, variable.name, follower.name) self._convert_valid_enum(follower, path) else: path = '{}.{}.{}'.format(namespace, family.name, variable.name) self._convert_valid_enum(variable, path) # valid_enums must be empty now (all information are store in objects) if self.valid_enums: raise CreoleDictConsistencyError(_('valid_enum sets for unknown variables {}').format(self.valid_enums.keys())) def change_variable_mode(self): # pylint: disable=C0111 if not hasattr(self.space, 'variables'): return for variables in self.space.variables.values(): if hasattr(variables, 'family'): for family in variables.family.values(): family_mode = family.mode if hasattr(family, 'variable'): for variable in family.variable.values(): if isinstance(variable, self.objectspace.Leadership): mode = modes_level[-1] for follower in variable.variable: if follower.auto_save is True: raise CreoleDictConsistencyError(_('leader/followers {} ' 'could not be ' 'auto_save').format(follower.name)) if follower.auto_freeze is True: raise CreoleDictConsistencyError(_('leader/followers {} ' 'could not be ' 'auto_freeze').format(follower.name)) if HIGH_COMPATIBILITY and variable.name != follower.name: # and variable.variable[0].mode != modes_level[0]: is_follower = True else: is_follower = False path = '{}.{}.{}'.format(family.path, variable.name, follower.name) self.annotate_variable(follower, family_mode, path, is_follower) # leader's mode is minimum level if modes[variable.variable[0].mode] > modes[follower.mode]: follower.mode = variable.variable[0].mode variable.mode = variable.variable[0].mode else: # auto_save's variable is set in 'basic' mode if its mode is 'normal' if variable.auto_save is True and variable.mode != modes_level[-1]: variable.mode = modes_level[0] # auto_freeze's variable is set in 'basic' mode if its mode is 'normal' if variable.auto_freeze is True and variable.mode != modes_level[-1]: variable.mode = modes_level[0] path = '{}.{}'.format(family.path, variable.name) self.annotate_variable(variable, family_mode, path) def convert_fill(self): # pylint: disable=C0111,R0912 if not hasattr(self.space, 'constraints') or not hasattr(self.space.constraints, 'fill'): return # sort fill/auto by index fills = {fill.index: fill for idx, fill in enumerate(self.space.constraints.fill)} indexes = list(fills.keys()) indexes.sort() targets = [] eosfunc = dir(self.eosfunc) for idx in indexes: fill = fills[idx] # test if it's redefined calculation if fill.target in targets and not fill.redefine: raise CreoleDictConsistencyError(_(f"A fill already exists for the target: {fill.target}")) targets.append(fill.target) # if not fill.name in eosfunc: raise CreoleDictConsistencyError(_('cannot find fill function {}').format(fill.name)) namespace = fill.namespace # let's replace the target by the path fill.target = self.paths.get_variable_path(fill.target, namespace) value = self.objectspace.value() value.type = 'calculation' value.name = fill.name if hasattr(fill, 'param'): param_to_delete = [] for fill_idx, param in enumerate(fill.param): if param.type not in TYPE_PARAM_FILL: raise CreoleDictConsistencyError(_(f'cannot use {param.type} type as a param in a fill/auto')) if param.type != 'string' and not hasattr(param, 'text'): raise CreoleDictConsistencyError(_(f"All '{param.type}' variables shall have a value in order to calculate {fill.target}")) if param.type == 'variable': try: param.text, suffix = self.paths.get_variable_path(param.text, namespace, with_suffix=True) if suffix: param.suffix = suffix except CreoleDictConsistencyError as err: if param.optional is False: raise err param_to_delete.append(fill_idx) continue if param.hidden is True: param.transitive = False param.hidden = None param_to_delete.sort(reverse=True) for param_idx in param_to_delete: fill.param.pop(param_idx) value.param = fill.param variable = self.paths.get_variable_obj(fill.target) variable.value = [value] del self.space.constraints.fill def filter_separators(self): # pylint: disable=C0111,R0201 # FIXME devrait etre dans la variable if not hasattr(self.space, 'variables'): return for family in self.space.variables.values(): if (hasattr(family, 'separators') and hasattr(family.separators, 'separator')): space = family.separators.separator names = [] for idx, separator in enumerate(space): namespace = self.paths.get_variable_namespace(separator.name) subpath = self.paths.get_variable_path(separator.name, namespace) separator.name = subpath if separator.name in names: raise CreoleDictConsistencyError(_('{} already has a separator').format(separator.name)) names.append(separator.name) def load_params_in_validenum(self, param): if param.type in ['string', 'python', 'number']: if not hasattr(param, 'text') and (param.type == 'python' or param.type == 'number'): raise CreoleDictConsistencyError(_("All '{}' variables shall be set in order to calculate {}").format(param.type, 'valid_enum')) if param.type in ['string', 'number']: try: values = literal_eval(param.text) except ValueError: raise CreoleDictConsistencyError(_('Cannot load {}').format(param.text)) elif param.type == 'python': try: #values = eval(param.text, {'eosfunc': self.eosfunc, '__builtins__': {'range': range, 'str': str}}) values = eval(param.text, {'eosfunc': self.eosfunc, '__builtins__': {'range': range, 'str': str}}) except NameError: raise CreoleDictConsistencyError(_('The function {} is unknown').format(param.text)) if not isinstance(values, list): raise CreoleDictConsistencyError(_('Function {} shall return a list').format(param.text)) new_values = [] for val in values: new_values.append(val) values = new_values else: values = param.text return values def filter_check(self): # pylint: disable=C0111 # valid param in check if not hasattr(self.space, 'constraints') or not hasattr(self.space.constraints, 'check'): return space = self.space.constraints.check remove_indexes = [] for check_idx, check in enumerate(space): namespace = check.namespace if hasattr(check, 'param'): param_option_indexes = [] for idx, param in enumerate(check.param): if param.type not in TYPE_PARAM_CHECK: raise CreoleDictConsistencyError(_('cannot use {} type as a param in check for {}').format(param.type, check.target)) if param.type == 'variable': try: param.text = self.paths.get_variable_path(param.text, namespace) except CreoleDictConsistencyError as err: if param.optional is True: param_option_indexes.append(idx) else: raise err param_option_indexes = list(set(param_option_indexes)) param_option_indexes.sort(reverse=True) for idx in param_option_indexes: check.param.pop(idx) if not HIGH_COMPATIBILITY and check.param == []: remove_indexes.append(check_idx) remove_indexes.sort(reverse=True) for idx in remove_indexes: del space[idx] variables = {} for index, check in enumerate(space): namespace = check.namespace if HIGH_COMPATIBILITY: if not self.paths.path_is_defined(check.target): continue check.is_in_leadership = self.paths.get_leader(check.target) != None # let's replace the target by the path check.target = self.paths.get_variable_path(check.target, namespace) if check.target not in variables: variables[check.target] = [] variables[check.target].append((index, check)) # remove check already set for a variable remove_indexes = [] for checks in variables.values(): names = {} for idx, check in checks: if HIGH_COMPATIBILITY and check.name == 'valid_enum': redefine = True else: redefine = False #redefine = bool(check.redefine) if redefine and check.name in names: remove_indexes.append(names[check.name]) del names[check.name] names[check.name] = idx del check.index remove_indexes.sort(reverse=True) for idx in remove_indexes: del space[idx] remove_indexes = [] functions = dir(self.eosfunc) functions.extend(['valid_enum', 'valid_in_network', 'valid_differ']) for idx, check in enumerate(space): if not check.name in functions: raise CreoleDictConsistencyError(_('cannot find check function {}').format(check.name)) #is_probe = not check.name in self.eosfunc.func_on_zephir_context #if is_probe: # raise CreoleDictConsistencyError(_('cannot have a check with probe function ({})').format(check.name)) if check.name == 'valid_enum': proposed_value_type = False remove_params = [] for param_idx, param in enumerate(check.param): if hasattr(param, 'name') and param.name == 'checkval': try: proposed_value_type = self.objectspace.convert_boolean(param.text) == False remove_params.append(param_idx) except TypeError as err: raise CreoleDictConsistencyError(_('cannot load checkval value for variable {}: {}').format(check.target, err)) if proposed_value_type: # no more supported raise CreoleDictConsistencyError(_('cannot load checkval value for variable {}, no more supported').format(check.target)) remove_params.sort(reverse=True) for param_idx in remove_params: del check.param[param_idx] if len(check.param) != 1: raise CreoleDictConsistencyError(_('cannot set more than one param ' 'for valid_enum for variable {}' '').format(check.target)) param = check.param[0] if proposed_value_type: if param.type == 'variable': try: values = self.load_params_in_validenum(param) except NameError as err: raise CreoleDictConsistencyError(_('cannot load value for variable {}: {}').format(check.target, err)) add_value = True if HIGH_COMPATIBILITY and check.is_in_leadership: add_value = False if add_value and values: self.force_value[check.target] = values[0] else: if check.target in self.valid_enums: raise CreoleDictConsistencyError(_('valid_enum already set for {}' '').format(check.target)) values = self.load_params_in_validenum(param) self.valid_enums[check.target] = {'type': param.type, 'values': values} remove_indexes.append(idx) remove_indexes.sort(reverse=True) for idx in remove_indexes: del space[idx] #convert level to "warnings_only" and hidden to "transitive" for check in space: if check.level == 'warning': check.warnings_only = True else: check.warnings_only = False check.level = None if hasattr(check, 'param'): for param in check.param: if not param.hidden is True: check.transitive = False param.hidden = None if not self.space.constraints.check: del self.space.constraints.check def convert_check(self): if not hasattr(self.space, 'constraints') or not hasattr(self.space.constraints, 'check'): return for check in self.space.constraints.check: variable = self.paths.get_variable_obj(check.target) check_ = self.objectspace.check() name = check.name if name == 'valid_differ': name = 'valid_not_equal' elif name == 'valid_network_netmask': params_len = 1 if len(check.param) != params_len: raise CreoleDictConsistencyError(_('{} must have {} param').format(name, params_len)) elif name == 'valid_ipnetmask': params_len = 1 if len(check.param) != params_len: raise CreoleDictConsistencyError(_('{} must have {} param').format(name, params_len)) name = 'valid_ip_netmask' elif name == 'valid_broadcast': params_len = 2 if len(check.param) != params_len: raise CreoleDictConsistencyError(_('{} must have {} param').format(name, params_len)) elif name == 'valid_in_network': params_len = 2 if len(check.param) != params_len: raise CreoleDictConsistencyError(_('{} must have {} param').format(name, params_len)) check_.name = name check_.warnings_only = check.warnings_only if hasattr(check, 'param'): check_.param = check.param if not hasattr(variable, 'check'): variable.check = [] variable.check.append(check_) del self.space.constraints.check def filter_targets(self): # pylint: disable=C0111 for condition_idx, condition in enumerate(self.space.constraints.condition): namespace = condition.namespace del_idx = [] for idx, target in enumerate(condition.target): if target.type == 'variable': if (hasattr(target, 'optional') and target.optional is True and not self.paths.path_is_defined(target.name)): del_idx.append(idx) continue if condition.source == target.name: raise CreoleDictConsistencyError(_('target name and source name must be different: {}').format(condition.source)) target.name = self.paths.get_variable_path(target.name, namespace) elif target.type == 'family': try: target.name = self.paths.get_family_path(target.name, namespace) except KeyError: raise CreoleDictConsistencyError(_('cannot found family {}').format(target.name)) del_idx = list(set(del_idx)) del_idx.sort(reverse=True) for idx in del_idx: condition.target.pop(idx) def convert_xxxlist_to_variable(self): # pylint: disable=C0111 # transform *list to variable or family for condition_idx, condition in enumerate(self.space.constraints.condition): new_targets = [] remove_targets = [] for target_idx, target in enumerate(condition.target): if target.type not in ['variable', 'family']: listname = target.type if not listname.endswith('list'): raise Exception('not yet implemented') listvars = self.objectspace.list_conditions.get(listname, {}).get(target.name) if listvars: for listvar in listvars: variable = self.paths.get_variable_obj(listvar) type_ = 'variable' new_target = self.objectspace.target() new_target.type = type_ new_target.name = listvar new_target.index = target.index new_targets.append(new_target) remove_targets.append(target_idx) remove_targets = list(set(remove_targets)) remove_targets.sort(reverse=True) for target_idx in remove_targets: condition.target.pop(target_idx) condition.target.extend(new_targets) def check_condition(self): for condition in self.space.constraints.condition: if condition.name not in ['disabled_if_in', 'disabled_if_not_in', 'hidden_if_in', 'auto_hidden_if_not_in', 'hidden_if_not_in', 'mandatory_if_in', 'mandatory_if_not_in']: raise CreoleDictConsistencyError(_(f'unknown condition {condition.name}')) def check_params(self): for condition in self.space.constraints.condition: for param in condition.param: if param.type not in TYPE_PARAM_CONDITION: raise CreoleDictConsistencyError(_(f'cannot use {param.type} type as a param in a condition')) def check_target(self): for condition in self.space.constraints.condition: if not hasattr(condition, 'target'): raise CreoleDictConsistencyError(_('target is mandatory in condition')) for target in condition.target: if target.type.endswith('list') and condition.name not in ['disabled_if_in', 'disabled_if_not_in']: raise CreoleDictConsistencyError(_(f'target in condition for {target.type} not allow in {condition.name}')) def check_condition_fallback_not_exists(self): # a condition with a fallback **and** the source variable doesn't exist remove_conditions = [] for idx, condition in enumerate(self.space.constraints.condition): if condition.fallback is True and not self.paths.path_is_defined(condition.source): for target in condition.target: if target.type in ['variable', 'family']: if target.name.startswith(VARIABLE_NAMESPACE + '.'): name = target.name.split('.')[-1] else: name = target.name if target.type == 'variable': variable = self.paths.get_variable_obj(name) else: variable = self.paths.get_family_obj(name) if condition.name == 'disabled_if_in': variable.disabled = True if condition.name == 'mandatory_if_in': variable.mandatory = True if condition.name == 'hidden_if_in': variable.hidden = True else: listname = target.type listvars = self.objectspace.list_conditions.get(listname, {}).get(target.name, None) if listvars is not None: for listvar in listvars: variable = self.paths.get_variable_obj(listvar) if condition.name in ['disabled_if_in']: variable.value[0].name = False del self.objectspace.list_conditions[listname][target.name] remove_conditions.append(idx) remove_conditions = list(set(remove_conditions)) remove_conditions.sort(reverse=True) for idx in remove_conditions: self.space.constraints.condition.pop(idx) def check_choice_option_condition(self, force_remove_targets): # remove condition for ChoiceOption that don't have param remove_conditions = [] for condition_idx, condition in enumerate(self.space.constraints.condition): namespace = condition.namespace src_variable = self.paths.get_variable_obj(condition.source) condition.source = self.paths.get_variable_path(condition.source, namespace, allow_source=True) valid_enum = None if condition.source in self.valid_enums and \ self.valid_enums[condition.source]['type'] == 'string': valid_enum = self.valid_enums[condition.source]['values'] if src_variable.type in FORCE_CHOICE: valid_enum = FORCE_CHOICE[src_variable.type] if valid_enum is not None: remove_param = [] for param_idx, param in enumerate(condition.param): if param.text not in valid_enum: remove_param.append(param_idx) remove_param.sort(reverse=True) for idx in remove_param: del condition.param[idx] if condition.param == []: for target in condition.target: if target.name.startswith(f'{VARIABLE_NAMESPACE}.'): name = target.name.split('.')[-1] else: name = target.name if target.type == 'variable': variable = self.paths.get_variable_obj(name) else: variable = self.paths.get_family_obj(name) if condition.name == 'disabled_if_not_in': variable.disabled = True force_remove_targets.setdefault(condition.name, []).append(target.name) elif condition.name == 'hidden_if_not_in': variable.hidden = True force_remove_targets.setdefault(condition.name, []).append(target.name) elif condition.name == 'mandatory_if_not_in': variable.mandatory = True force_remove_targets.setdefault(condition.name, []).append(target.name) remove_conditions.append(condition_idx) remove_conditions = list(set(remove_conditions)) remove_conditions.sort(reverse=True) for idx in remove_conditions: self.space.constraints.condition.pop(idx) def manage_variable_property(self, force_remove_targets): for condition in self.space.constraints.condition: remove_targets = [] #parse each variable and family for target_idx, target in enumerate(condition.target): if target.name in force_remove_targets.get(condition.name, []): remove_targets.append(target_idx) if target.name.startswith(f'{VARIABLE_NAMESPACE}.'): name = target.name.split('.')[-1] else: name = target.name if target.type == 'variable': variable = self.paths.get_variable_obj(name) else: variable = self.paths.get_family_obj(name) if condition.name in ['hidden_if_in', 'hidden_if_not_in']: variable.hidden = False if condition.name in ['mandatory_if_in', 'mandatory_if_not_in']: variable.mandatory = False if HIGH_COMPATIBILITY and condition.name in ['hidden_if_in', 'hidden_if_not_in']: self.has_hidden_if_in_condition.append(name) if condition.name in ['mandatory_if_in', 'mandatory_if_not_in']: self.force_not_mandatory.append(target.name) remove_targets = list(set(remove_targets)) remove_targets.sort(reverse=True) for target_idx in remove_targets: condition.target.pop(target_idx) def remove_condition_with_empty_target(self): remove_conditions = [] for condition_idx, condition in enumerate(self.space.constraints.condition): if not condition.target: remove_conditions.append(condition_idx) remove_conditions = list(set(remove_conditions)) remove_conditions.sort(reverse=True) for idx in remove_conditions: self.space.constraints.condition.pop(idx) def filter_condition(self): # pylint: disable=C0111 if not hasattr(self.space, 'constraints') or not hasattr(self.space.constraints, 'condition'): return force_remove_targets = {} self.check_condition() self.check_params() self.check_target() self.check_condition_fallback_not_exists() self.filter_targets() self.convert_xxxlist_to_variable() self.check_choice_option_condition(force_remove_targets) self.manage_variable_property(force_remove_targets) self.remove_condition_with_empty_target() for condition in self.space.constraints.condition: inverse = condition.name.endswith('_if_not_in') if condition.name.startswith('disabled_if_'): actions = ['disabled'] elif condition.name.startswith('hidden_if_'): actions = ['frozen', 'hidden', 'force_default_on_freeze'] elif condition.name.startswith('mandatory_if_'): actions = ['mandatory'] elif condition.name == 'auto_hidden_if_not_in': actions = ['auto_frozen'] for param in condition.param: if hasattr(param, 'text'): param = param.text else: param = None for target in condition.target: if target.name.startswith(f'{VARIABLE_NAMESPACE}.'): name = target.name.split('.')[-1] else: name = target.name if target.type == 'variable': variable = self.paths.get_variable_obj(name) else: variable = self.paths.get_family_obj(name) if not hasattr(variable, 'property'): variable.property = [] for action in actions: prop = self.objectspace.property_() prop.type = 'calculation' prop.inverse = inverse prop.source = condition.source prop.expected = param prop.name = action variable.property.append(prop) del self.space.constraints.condition