# coding: utf-8 from copy import copy from typing import List from collections import OrderedDict from os.path import join, basename from ast import literal_eval import imp from .i18n import _ from .utils import normalize_family from .error import DictConsistencyError #mode order is important modes_level = ('basic', 'normal', 'expert') class Mode(object): def __init__(self, name, level): self.name = name self.level = level def __gt__(self, other): return other.level < self.level def mode_factory(): mode_obj = {} for idx in range(len(modes_level)): name = modes_level[idx] mode_obj[name] = Mode(name, idx) return mode_obj modes = mode_factory() # a CreoleObjSpace's attribute has some annotations # that shall not be present in the exported (flatened) XML ERASED_ATTRIBUTES = ('redefine', 'exists', 'fallback', 'optional', 'remove_check', 'namespace', 'remove_condition', 'path', 'instance_mode', 'index', 'is_in_leadership', 'level') # , '_real_container') ERASED_CONTAINER_ATTRIBUTES = ('id', 'container', 'group_id', 'group', 'container_group') FORCE_CHOICE = {'oui/non': ['oui', 'non'], 'on/off': ['on', 'off'], 'yes/no': ['yes', 'no'], 'schedule': ['none', 'daily', 'weekly', 'monthly'], 'schedulemod': ['pre', 'post']} KEY_TYPE = {'variable': 'symlink', 'SymLinkOption': 'symlink', 'PortOption': 'port', 'UnicodeOption': 'string', 'NetworkOption': 'network', 'NetmaskOption': 'netmask', 'URLOption': 'web_address', 'FilenameOption': 'filename'} CONVERSION = {'number': int} FREEZE_AUTOFREEZE_VARIABLE = 'module_instancie' PROPERTIES = ('hidden', 'frozen', 'auto_freeze', 'auto_save', 'force_default_on_freeze', 'force_store_value', 'disabled', 'mandatory') CONVERT_PROPERTIES = {'auto_save': ['force_store_value'], 'auto_freeze': ['force_store_value', 'auto_freeze']} RENAME_ATTIBUTES = {'description': 'doc'} INTERNAL_FUNCTIONS = ['valid_enum', 'valid_in_network', 'valid_differ', 'valid_entier'] class SpaceAnnotator: """Transformations applied on a CreoleObjSpace instance """ def __init__(self, objectspace, eosfunc_file): self.objectspace = objectspace GroupAnnotator(objectspace) ServiceAnnotator(objectspace) VariableAnnotator(objectspace) ConstraintAnnotator(objectspace, eosfunc_file, ) FamilyAnnotator(objectspace) PropertyAnnotator(objectspace) class GroupAnnotator: def __init__(self, objectspace, ): self.objectspace = objectspace if not hasattr(self.objectspace.space, 'constraints') or not hasattr(self.objectspace.space.constraints, 'group'): return self.convert_groups() def convert_groups(self): # pylint: disable=C0111 for group in self.objectspace.space.constraints.group: leader_fullname = group.leader leader_family_name = self.objectspace.paths.get_variable_family_name(leader_fullname) leader_name = self.objectspace.paths.get_variable_name(leader_fullname) namespace = self.objectspace.paths.get_variable_namespace(leader_fullname) if '.' not in leader_fullname: leader_fullname = '.'.join([namespace, leader_family_name, leader_fullname]) follower_names = list(group.follower.keys()) has_a_leader = False ori_leader_family = self.objectspace.paths.get_family_obj(leader_fullname.rsplit('.', 1)[0]) for variable in list(ori_leader_family.variable.values()): if has_a_leader: # it's a follower self.manage_follower(namespace, leader_family_name, variable, leader_name, follower_names, leader_space, leader_is_hidden, ) ori_leader_family.variable.pop(variable.name) if follower_names == []: # no more follower break elif variable.name == leader_name: # it's a leader if isinstance(variable, self.objectspace.Leadership): # append follower to an existed leadership leader_space = variable # if variable.hidden: # leader_is_hidden = True else: leader_space = self.objectspace.Leadership() leader_is_hidden = self.manage_leader(leader_space, leader_family_name, leader_name, namespace, variable, group, leader_fullname, ) has_a_leader = True else: raise DictConsistencyError(_('cannot found followers {}').format(follower_names)) del self.objectspace.space.constraints.group def manage_leader(self, leader_space: 'Leadership', leader_family_name: str, leader_name: str, namespace: str, variable: 'Variable', group: 'Group', leader_fullname: str, ) -> None: # manage leader's variable if variable.multi is not True: raise DictConsistencyError(_('the variable {} in a group must be multi').format(variable.name)) leader_space.variable = [] leader_space.name = leader_name leader_space.hidden = variable.hidden if variable.hidden: leader_is_hidden = True variable.frozen = True variable.force_default_on_freeze = True else: leader_is_hidden = False variable.hidden = None if hasattr(group, 'description'): leader_space.doc = group.description elif hasattr(variable, 'description'): leader_space.doc = variable.description else: leader_space.doc = variable.name leader_path = namespace + '.' + leader_family_name + '.' + leader_name self.objectspace.paths.add_family(namespace, leader_path, leader_space, ) leader_family = self.objectspace.space.variables[namespace].family[leader_family_name] leader_family.variable[leader_name] = leader_space leader_space.variable.append(variable) self.objectspace.paths.set_leader(namespace, leader_family_name, leader_name, leader_name, ) leader_space.path = leader_fullname return leader_is_hidden def manage_follower(self, namespace: str, leader_family_name: str, variable: 'Variable', leader_name: str, follower_names: List[str], leader_space: 'Leadership', leader_is_hidden: bool, ) -> None: if variable.name != follower_names[0]: raise DictConsistencyError(_('cannot found this follower {}').format(follower_names[0])) follower_names.remove(variable.name) if leader_is_hidden: variable.frozen = True variable.force_default_on_freeze = True leader_space.variable.append(variable) # pylint: disable=E1101 self.objectspace.paths.set_leader(namespace, leader_family_name, variable.name, leader_name, ) class ServiceAnnotator: """Manage service's object for example:: 123 """ def __init__(self, objectspace): self.objectspace = objectspace self.convert_services() def convert_services(self): if not hasattr(self.objectspace.space, 'services'): return if not hasattr(self.objectspace.space.services, 'service'): del self.objectspace.space.services return self.objectspace.space.services.hidden = True self.objectspace.space.services.name = 'services' self.objectspace.space.services.doc = 'services' families = {} for idx, service_name in enumerate(self.objectspace.space.services.service.keys()): service = self.objectspace.space.services.service[service_name] new_service = self.objectspace.service() for elttype, values in vars(service).items(): if not isinstance(values, (dict, list)) or elttype in ERASED_ATTRIBUTES: setattr(new_service, elttype, values) continue eltname = elttype + 's' path = '.'.join(['services', service_name, eltname]) family = self.gen_family(eltname, path, ) if isinstance(values, dict): values = list(values.values()) family.family = self.make_group_from_elts(service_name, elttype, values, path, ) setattr(new_service, elttype, family) new_service.doc = new_service.name families[service_name] = new_service self.objectspace.space.services.service = families def gen_family(self, name, path, ): family = self.objectspace.family() family.name = normalize_family(name) family.doc = name family.mode = None self.objectspace.paths.add_family('services', path, family, ) return family def make_group_from_elts(self, service_name, name, elts, path, ): """Splits each objects into a group (and `OptionDescription`, in tiramisu terms) and build elements and its attributes (the `Options` in tiramisu terms) """ families = [] new_elts = self._reorder_elts(name, elts, ) for index, elt_info in enumerate(new_elts): elt = elt_info['elt'] elt_name = elt_info['elt_name'] # try to launch _update_xxxx() function update_elt = '_update_' + elt_name if hasattr(self, update_elt): getattr(self, update_elt)(elt, index, path, service_name, ) idx = 0 while True: if hasattr(elt, 'source'): c_name = elt.source else: c_name = elt.name if idx: c_name += f'_{idx}' subpath = '{}.{}'.format(path, c_name) if not self.objectspace.paths.family_is_defined(subpath): break idx += 1 family = self.gen_family(c_name, subpath) family.variable = [] listname = '{}list'.format(name) activate_path = '.'.join([subpath, 'activate']) for key in dir(elt): if key.startswith('_') or key.endswith('_type') or key in ERASED_ATTRIBUTES: continue value = getattr(elt, key) if key == listname: self.objectspace.list_conditions.setdefault(listname, {}).setdefault( value, []).append(activate_path) continue family.variable.append(self._generate_element(elt_name, key, value, elt, f'{subpath}.{key}' )) # FIXME ne devrait pas etre True par défaut # devrait etre un calcule family.variable.append(self._generate_element(elt_name, 'activate', True, elt, activate_path, )) families.append(family) return families def _generate_element(self, elt_name, key, value, elt, path, ): variable = self.objectspace.variable() variable.name = normalize_family(key) variable.mode = None if key == 'name': true_key = elt_name else: true_key = key dtd_key_type = true_key + '_type' if key == 'activate': type_ = 'boolean' elif hasattr(elt, dtd_key_type): type_ = KEY_TYPE[getattr(elt, dtd_key_type)] elif key in self.objectspace.booleans_attributs: type_ = 'boolean' else: type_ = 'string' variable.type = type_ if type_ == 'symlink': variable.opt = self.objectspace.paths.get_variable_path(value, 'services', ) # variable.opt = value variable.multi = None else: variable.doc = key val = self.objectspace.value() val.type = type_ val.name = value variable.value = [val] self.objectspace.paths.add_variable('services', path, 'service', False, variable, ) return variable def _reorder_elts(self, name, elts, ): """Reorders by index the elts """ new_elts = {} # reorder elts by index for idx, elt in enumerate(elts): new_elts.setdefault(idx, []).append(elt) idxes = list(new_elts.keys()) idxes.sort() result_elts = [] for idx in idxes: for elt in new_elts[idx]: result_elts.append({'elt_name': name, 'elt': elt}) return result_elts def _update_override(self, file_, index, service_path, service_name, ): file_.name = f'/systemd/system/{service_name}.service.d/rougail.conf' # retrieve default value from File object for attr in ['owner', 'group', 'mode']: setattr(file_, attr, getattr(self.objectspace.file, attr)) if not hasattr(file_, 'source'): file_.source = f'{service_name}.service' self._update_file(file_, index, service_path, service_name, ) def _update_file(self, file_, index, service_path, service_name, ): if not hasattr(file_, 'file_type') or file_.file_type == "UnicodeOption": if not hasattr(file_, 'source'): file_.source = basename(file_.name) elif not hasattr(file_, 'source'): raise DictConsistencyError(_('attribute source mandatory for file with variable name ' 'for {}').format(file_.name)) class VariableAnnotator: def __init__(self, objectspace, ): self.objectspace = objectspace self.convert_variable() self.convert_helps() self.convert_auto_freeze() self.convert_separators() def convert_variable(self): def _convert_variable(variable, variable_type, ): if not hasattr(variable, 'type'): variable.type = 'string' if variable.type != 'symlink' and not hasattr(variable, 'description'): variable.description = variable.name if hasattr(variable, 'value'): for value in variable.value: if not hasattr(value, 'type'): value.type = variable.type value.name = CONVERSION.get(value.type, str)(value.name) for key, value in RENAME_ATTIBUTES.items(): setattr(variable, value, getattr(variable, key)) setattr(variable, key, None) if variable_type == 'follower': if variable.multi is True: variable.multi = 'submulti' else: variable.multi = True def _convert_valid_enum(namespace, variable, path, ): if variable.type in FORCE_CHOICE: check = self.objectspace.check() check.name = 'valid_enum' check.target = path check.namespace = namespace check.param = [] for value in FORCE_CHOICE[variable.type]: param = self.objectspace.param() param.text = value check.param.append(param) if not hasattr(self.objectspace.space, 'constraints'): self.objectspace.space.constraints = self.objectspace.constraints() self.objectspace.space.constraints.namespace = namespace if not hasattr(self.objectspace.space.constraints, 'check'): self.objectspace.space.constraints.check = [] self.objectspace.space.constraints.check.append(check) variable.type = 'string' if not hasattr(self.objectspace.space, 'variables'): return for families in self.objectspace.space.variables.values(): namespace = families.name if hasattr(families, 'family'): families.doc = families.name for family in families.family.values(): family.doc = family.name for key, value in RENAME_ATTIBUTES.items(): if hasattr(family, key): setattr(family, value, getattr(family, key)) setattr(family, key, None) family.name = normalize_family(family.name) if hasattr(family, 'variable'): for variable in family.variable.values(): if isinstance(variable, self.objectspace.Leadership): for idx, follower in enumerate(variable.variable): if idx == 0: variable_type = 'master' else: variable_type = 'follower' path = '{}.{}.{}.{}'.format(namespace, normalize_family(family.name), variable.name, follower.name) _convert_variable(follower, variable_type, ) _convert_valid_enum(namespace, follower, path, ) else: path = '{}.{}.{}'.format(namespace, normalize_family(family.name), variable.name) _convert_variable(variable, 'variable', ) _convert_valid_enum(namespace, variable, path, ) def convert_helps(self): if not hasattr(self.objectspace.space, 'help'): return helps = self.objectspace.space.help if hasattr(helps, 'variable'): for hlp in helps.variable.values(): variable = self.objectspace.paths.get_variable_obj(hlp.name) variable.help = hlp.text if hasattr(helps, 'family'): for hlp in helps.family.values(): variable = self.objectspace.paths.get_family_obj(hlp.name) variable.help = hlp.text del self.objectspace.space.help def convert_auto_freeze(self): # pylint: disable=C0111 def _convert_auto_freeze(variable, namespace): if variable.auto_freeze: new_condition = self.objectspace.condition() new_condition.name = 'auto_hidden_if_not_in' new_condition.namespace = namespace new_condition.source = FREEZE_AUTOFREEZE_VARIABLE new_param = self.objectspace.param() new_param.text = 'oui' new_condition.param = [new_param] new_target = self.objectspace.target() new_target.type = 'variable' path = variable.namespace + '.' + normalize_family(family.name) + '.' + variable.name new_target.name = path new_condition.target = [new_target] if not hasattr(self.objectspace.space.constraints, 'condition'): self.objectspace.space.constraints.condition = [] self.objectspace.space.constraints.condition.append(new_condition) if hasattr(self.objectspace.space, 'variables'): for variables in self.objectspace.space.variables.values(): if hasattr(variables, 'family'): namespace = variables.name for family in variables.family.values(): if hasattr(family, 'variable'): for variable in family.variable.values(): if isinstance(variable, self.objectspace.Leadership): for follower in variable.variable: _convert_auto_freeze(follower, namespace) else: _convert_auto_freeze(variable, namespace) def convert_separators(self): # pylint: disable=C0111,R0201 if not hasattr(self.objectspace.space, 'variables'): return for family in self.objectspace.space.variables.values(): if not hasattr(family, 'separators'): continue if hasattr(family.separators, 'separator'): for idx, separator in enumerate(family.separators.separator): option = self.objectspace.paths.get_variable_obj(separator.name) if hasattr(option, 'separator'): subpath = self.objectspace.paths.get_variable_path(separator.name, separator.namespace, ) raise DictConsistencyError(_('{} already has a separator').format(subpath)) option.separator = separator.text del family.separators class ConstraintAnnotator: def __init__(self, objectspace, eosfunc_file, ): if not hasattr(objectspace.space, 'constraints'): return self.objectspace = objectspace self.eosfunc = imp.load_source('eosfunc', eosfunc_file) self.valid_enums = {} if hasattr(self.objectspace.space.constraints, 'check'): self.check_check() self.check_replace_text() self.check_valid_enum() self.check_change_warning() self.convert_check() if hasattr(self.objectspace.space.constraints, 'condition'): self.check_params_target() self.filter_targets() self.convert_xxxlist_to_variable() self.check_condition_fallback_optional() self.check_choice_option_condition() self.remove_condition_with_empty_target() self.convert_condition() if hasattr(self.objectspace.space.constraints, 'fill'): self.convert_fill() self.remove_constraints() def check_check(self): remove_indexes = [] functions = dir(self.eosfunc) functions.extend(INTERNAL_FUNCTIONS) for check_idx, check in enumerate(self.objectspace.space.constraints.check): if not check.name in functions: raise DictConsistencyError(_('cannot find check function {}').format(check.name)) if hasattr(check, 'param'): param_option_indexes = [] for idx, param in enumerate(check.param): if param.type == 'variable' and not self.objectspace.paths.path_is_defined(param.text): if param.optional is True: param_option_indexes.append(idx) else: raise DictConsistencyError(_(f'unknown param {param.text} in check')) if param.type != 'variable': param.notraisepropertyerror = None param_option_indexes = list(set(param_option_indexes)) param_option_indexes.sort(reverse=True) for idx in param_option_indexes: check.param.pop(idx) if check.param == []: remove_indexes.append(check_idx) remove_indexes.sort(reverse=True) for idx in remove_indexes: del self.objectspace.space.constraints.check[idx] def check_replace_text(self): for check_idx, check in enumerate(self.objectspace.space.constraints.check): namespace = check.namespace if hasattr(check, 'param'): for idx, param in enumerate(check.param): if param.type == 'variable': param.text = self.objectspace.paths.get_variable_path(param.text, namespace) check.is_in_leadership = self.objectspace.paths.get_leader(check.target) != None # let's replace the target by the path check.target = self.objectspace.paths.get_variable_path(check.target, namespace) def check_valid_enum(self): remove_indexes = [] for idx, check in enumerate(self.objectspace.space.constraints.check): if check.name == 'valid_enum': if check.target in self.valid_enums: raise DictConsistencyError(_(f'valid_enum already set for {check.target}')) if not hasattr(check, 'param'): raise DictConsistencyError(_(f'param is mandatory for a valid_enum of variable {check.target}')) variable = self.objectspace.paths.get_variable_obj(check.target) values = self.load_params_in_valid_enum(check.param, variable.name, variable.type, ) self._set_valid_enum(variable, values, variable.type, check.target ) remove_indexes.append(idx) remove_indexes.sort(reverse=True) for idx in remove_indexes: del self.objectspace.space.constraints.check[idx] def load_params_in_valid_enum(self, params, variable_name, variable_type, ): has_variable = None values = [] for param in params: if param.type == 'variable': if has_variable is not None: raise DictConsistencyError(_(f'only one "variable" parameter is allowed for valid_enum of variable {variable_name}')) has_variable = True variable = self.objectspace.paths.get_variable_obj(param.text) if not variable.multi: raise DictConsistencyError(_(f'only multi "variable" parameter is allowed for valid_enum of variable {variable_name}')) values = param.text else: if has_variable: raise DictConsistencyError(_(f'only one "variable" parameter is allowed for valid_enum of variable {variable_name}')) if not hasattr(param, 'text'): if param.type == 'number': raise DictConsistencyError(_(f'value is mandatory for valid_enum of variable {variable_name}')) values.append(None) else: values.append(param.text) return values def check_change_warning(self): #convert level to "warnings_only" for check in self.objectspace.space.constraints.check: if check.level == 'warning': check.warnings_only = True else: check.warnings_only = False check.level = None def _get_family_variables_from_target(self, target, ): if target.type == 'variable': variable = self.objectspace.paths.get_variable_obj(target.name) family = self.objectspace.paths.get_family_obj(target.name.rsplit('.', 1)[0]) if isinstance(family, self.objectspace.Leadership) and family.name == variable.name: return family, family.variable return variable, [variable] # it's a family variable = self.objectspace.paths.get_family_obj(target.name) return variable, list(variable.variable.values()) def check_params_target(self): for condition in self.objectspace.space.constraints.condition: if not hasattr(condition, 'target'): raise DictConsistencyError(_('target is mandatory in condition')) for target in condition.target: if target.type.endswith('list') and condition.name not in ['disabled_if_in', 'disabled_if_not_in']: raise DictConsistencyError(_(f'target in condition for {target.type} not allow in {condition.name}')) def filter_targets(self): # pylint: disable=C0111 for condition_idx, condition in enumerate(self.objectspace.space.constraints.condition): namespace = condition.namespace for idx, target in enumerate(condition.target): if target.type == 'variable': if condition.source == target.name: raise DictConsistencyError(_('target name and source name must be different: {}').format(condition.source)) try: target.name = self.objectspace.paths.get_variable_path(target.name, namespace) except DictConsistencyError: # for optional variable pass elif target.type == 'family': try: target.name = self.objectspace.paths.get_family_path(target.name, namespace) except KeyError: raise DictConsistencyError(_('cannot found family {}').format(target.name)) def convert_xxxlist_to_variable(self): # pylint: disable=C0111 # transform *list to variable or family for condition_idx, condition in enumerate(self.objectspace.space.constraints.condition): new_targets = [] remove_targets = [] for target_idx, target in enumerate(condition.target): if target.type.endswith('list'): listname = target.type listvars = self.objectspace.list_conditions.get(listname, {}).get(target.name) if listvars: for listvar in listvars: variable = self.objectspace.paths.get_variable_obj(listvar) type_ = 'variable' new_target = self.objectspace.target() new_target.type = type_ new_target.name = listvar new_target.index = target.index new_targets.append(new_target) remove_targets.append(target_idx) remove_targets.sort(reverse=True) for target_idx in remove_targets: condition.target.pop(target_idx) condition.target.extend(new_targets) def check_condition_fallback_optional(self): # a condition with a fallback **and** the source variable doesn't exist remove_conditions = [] for idx, condition in enumerate(self.objectspace.space.constraints.condition): # fallback if condition.fallback is True and not self.objectspace.paths.path_is_defined(condition.source): apply_action = False if condition.name in ['disabled_if_in', 'mandatory_if_in', 'hidden_if_in']: apply_action = not condition.force_condition_on_fallback else: apply_action = condition.force_inverse_condition_on_fallback if apply_action: actions = self._get_condition_actions(condition.name) for target in condition.target: leader_or_variable, variables = self._get_family_variables_from_target(target) for action_idx, action in enumerate(actions): if action_idx == 0: setattr(leader_or_variable, action, True) else: for variable in variables: setattr(variable, action, True) remove_conditions.append(idx) continue remove_targets = [] # optional for idx, target in enumerate(condition.target): if target.optional is True and not self.objectspace.paths.path_is_defined(target.name): remove_targets.append(idx) remove_targets = list(set(remove_targets)) remove_targets.sort(reverse=True) for idx in remove_targets: condition.target.pop(idx) remove_conditions = list(set(remove_conditions)) remove_conditions.sort(reverse=True) for idx in remove_conditions: self.objectspace.space.constraints.condition.pop(idx) def _get_condition_actions(self, condition_name): if condition_name.startswith('disabled_if_'): return ['disabled'] elif condition_name.startswith('hidden_if_'): return ['hidden', 'frozen', 'force_default_on_freeze'] elif condition_name.startswith('mandatory_if_'): return ['mandatory'] elif condition_name == 'auto_hidden_if_not_in': return ['auto_frozen'] def check_choice_option_condition(self): # remove condition for ChoiceOption that don't have param remove_conditions = [] for condition_idx, condition in enumerate(self.objectspace.space.constraints.condition): namespace = condition.namespace condition.source = self.objectspace.paths.get_variable_path(condition.source, namespace, allow_source=True) src_variable = self.objectspace.paths.get_variable_obj(condition.source) valid_enum = None if condition.source in self.valid_enums and self.valid_enums[condition.source]['type'] == 'string': valid_enum = self.valid_enums[condition.source]['values'] if valid_enum is not None: remove_param = [] for param_idx, param in enumerate(condition.param): if param.text not in valid_enum: remove_param.append(param_idx) remove_param.sort(reverse=True) for idx in remove_param: del condition.param[idx] if condition.param == []: remove_targets = [] for target in condition.target: leader_or_variable, variables = self._get_family_variables_from_target(target) if condition.name == 'disabled_if_not_in': leader_or_variable.disabled = True elif condition.name == 'hidden_if_not_in': leader_or_variable.hidden = True for variable in variables: variable.frozen = True variable.force_default_on_freeze = True elif condition.name == 'mandatory_if_not_in': variable.mandatory = True remove_targets = list(set(remove_targets)) remove_targets.sort(reverse=True) for target_idx in remove_targets: condition.target.pop(target_idx) remove_conditions.append(condition_idx) remove_conditions = list(set(remove_conditions)) remove_conditions.sort(reverse=True) for idx in remove_conditions: self.objectspace.space.constraints.condition.pop(idx) def remove_condition_with_empty_target(self): remove_conditions = [] for condition_idx, condition in enumerate(self.objectspace.space.constraints.condition): if not condition.target: remove_conditions.append(condition_idx) remove_conditions = list(set(remove_conditions)) remove_conditions.sort(reverse=True) for idx in remove_conditions: self.objectspace.space.constraints.condition.pop(idx) def convert_condition(self): for condition in self.objectspace.space.constraints.condition: inverse = condition.name.endswith('_if_not_in') actions = self._get_condition_actions(condition.name) for param in condition.param: if hasattr(param, 'text'): param = param.text else: param = None for target in condition.target: leader_or_variable, variables = self._get_family_variables_from_target(target) # if option is already disable, do not apply disable_if_in if hasattr(leader_or_variable, actions[0]) and getattr(leader_or_variable, actions[0]) is True: continue for idx, action in enumerate(actions): prop = self.objectspace.property_() prop.type = 'calculation' prop.inverse = inverse prop.source = condition.source prop.expected = param prop.name = action if idx == 0: if not hasattr(leader_or_variable, 'property'): leader_or_variable.property = [] leader_or_variable.property.append(prop) else: for variable in variables: if not hasattr(variable, 'property'): variable.property = [] variable.property.append(prop) del self.objectspace.space.constraints.condition def _set_valid_enum(self, variable, values, type_, target): # value for choice's variable is mandatory variable.mandatory = True # build choice variable.choice = [] if isinstance(values, str): choice = self.objectspace.choice() choice.type = 'calculation' choice.name = values variable.choice.append(choice) else: self.valid_enums[target] = {'type': type_, 'values': values, } choices = [] for value in values: choice = self.objectspace.choice() try: if value is not None: choice.name = CONVERSION.get(type_, str)(value) else: choice.name = value except: raise DictConsistencyError(_(f'unable to change type of a valid_enum entry "{value}" is not a valid "{type_}" for "{variable.name}"')) if choice.name == '': choice.name = None choices.append(choice.name) choice.type = type_ variable.choice.append(choice) # check value or set first choice value has default value if hasattr(variable, 'value'): for value in variable.value: value.type = type_ try: cvalue = CONVERSION.get(type_, str)(value.name) except: raise DictConsistencyError(_(f'unable to change type of value "{value}" is not a valid "{type_}" for "{variable.name}"')) if cvalue not in choices: raise DictConsistencyError(_('value "{}" of variable "{}" is not in list of all expected values ({})').format(value.name, variable.name, choices)) else: new_value = self.objectspace.value() new_value.name = choices[0] new_value.type = type_ variable.value = [new_value] if not variable.choice: raise DictConsistencyError(_('empty valid enum is not allowed for variable {}').format(variable.name)) variable.type = 'choice' def convert_check(self): for check in self.objectspace.space.constraints.check: variable = self.objectspace.paths.get_variable_obj(check.target) name = check.name if name == 'valid_entier': if not hasattr(check, 'param'): raise DictConsistencyError(_('{} must have, at least, 1 param').format(name)) for param in check.param: if param.type not in ['string', 'number']: raise DictConsistencyError(_(f'param in "valid_entier" must not be a "{param.type}"')) if param.name == 'mini': variable.min_number = int(param.text) elif param.name == 'maxi': variable.max_number = int(param.text) else: raise DictConsistencyError(_(f'unknown parameter {param.text} in check "valid_entier" for variable {check.target}')) else: check_ = self.objectspace.check() if name == 'valid_differ': name = 'valid_not_equal' elif name == 'valid_network_netmask': params_len = 1 if len(check.param) != params_len: raise DictConsistencyError(_('{} must have {} param').format(name, params_len)) elif name == 'valid_ipnetmask': params_len = 1 if len(check.param) != params_len: raise DictConsistencyError(_('{} must have {} param').format(name, params_len)) name = 'valid_ip_netmask' elif name == 'valid_broadcast': params_len = 2 if len(check.param) != params_len: raise DictConsistencyError(_('{} must have {} param').format(name, params_len)) elif name == 'valid_in_network': params_len = 2 if len(check.param) != params_len: raise DictConsistencyError(_('{} must have {} param').format(name, params_len)) check_.name = name check_.warnings_only = check.warnings_only if hasattr(check, 'param'): check_.param = check.param if not hasattr(variable, 'check'): variable.check = [] variable.check.append(check_) del self.objectspace.space.constraints.check def convert_fill(self): # pylint: disable=C0111,R0912 # sort fill/auto by index fills = {fill.index: fill for idx, fill in enumerate(self.objectspace.space.constraints.fill)} indexes = list(fills.keys()) indexes.sort() targets = [] eosfunc = dir(self.eosfunc) for idx in indexes: fill = fills[idx] # test if it's redefined calculation if fill.target in targets and not fill.redefine: raise DictConsistencyError(_(f"A fill already exists for the target: {fill.target}")) targets.append(fill.target) # if fill.name not in eosfunc: raise DictConsistencyError(_('cannot find fill function {}').format(fill.name)) namespace = fill.namespace # let's replace the target by the path fill.target, suffix = self.objectspace.paths.get_variable_path(fill.target, namespace, with_suffix=True, ) if suffix is not None: raise DictConsistencyError(_(f'Cannot add fill function to "{fill.target}" only with the suffix "{suffix}"')) value = self.objectspace.value() value.type = 'calculation' value.name = fill.name if hasattr(fill, 'param'): param_to_delete = [] for fill_idx, param in enumerate(fill.param): if param.type not in ['suffix', 'string'] and not hasattr(param, 'text'): raise DictConsistencyError(_(f"All '{param.type}' variables must have a value in order to calculate {fill.target}")) if param.type == 'suffix' and hasattr(param, 'text'): raise DictConsistencyError(_(f"All '{param.type}' variables must not have a value in order to calculate {fill.target}")) if param.type == 'variable': try: param.text, suffix = self.objectspace.paths.get_variable_path(param.text, namespace, with_suffix=True, ) if suffix: param.suffix = suffix except DictConsistencyError as err: if param.optional is False: raise err param_to_delete.append(fill_idx) continue else: param.notraisepropertyerror = None param_to_delete.sort(reverse=True) for param_idx in param_to_delete: fill.param.pop(param_idx) value.param = fill.param variable = self.objectspace.paths.get_variable_obj(fill.target) variable.value = [value] del self.objectspace.space.constraints.fill def remove_constraints(self): if hasattr(self.objectspace.space.constraints, 'index'): del self.objectspace.space.constraints.index del self.objectspace.space.constraints.namespace if vars(self.objectspace.space.constraints): raise Exception('constraints again?') del self.objectspace.space.constraints class FamilyAnnotator: def __init__(self, objectspace, ): self.objectspace = objectspace self.remove_empty_families() self.change_variable_mode() self.change_family_mode() self.dynamic_families() def remove_empty_families(self): # pylint: disable=C0111,R0201 if hasattr(self.objectspace.space, 'variables'): for family in self.objectspace.space.variables.values(): if hasattr(family, 'family'): space = family.family removed_families = [] for family_name, family in space.items(): if not hasattr(family, 'variable') or len(family.variable) == 0: removed_families.append(family_name) for family_name in removed_families: del space[family_name] def change_family_mode(self): # pylint: disable=C0111 if not hasattr(self.objectspace.space, 'variables'): return for family in self.objectspace.space.variables.values(): if hasattr(family, 'family'): for family in family.family.values(): mode = modes_level[-1] for variable in family.variable.values(): if isinstance(variable, self.objectspace.Leadership): variable_mode = variable.variable[0].mode variable.variable[0].mode = None variable.mode = variable_mode else: variable_mode = variable.mode if variable_mode is not None and modes[mode] > modes[variable_mode]: mode = variable_mode family.mode = mode def dynamic_families(self): # pylint: disable=C0111 if not hasattr(self.objectspace.space, 'variables'): return for family in self.objectspace.space.variables.values(): if hasattr(family, 'family'): for family in family.family.values(): if 'dynamic' in vars(family): namespace = self.objectspace.paths.get_variable_namespace(family.dynamic) varpath = self.objectspace.paths.get_variable_path(family.dynamic, namespace) family.dynamic = varpath def annotate_variable(self, variable, family_mode, path, is_follower=False): # if the variable is mandatory and doesn't have any value # then the variable's mode is set to 'basic' if not hasattr(variable, 'value') and variable.type == 'boolean': new_value = self.objectspace.value() new_value.name = True new_value.type = 'boolean' variable.value = [new_value] if hasattr(variable, 'value') and variable.value: has_value = True for value in variable.value: if value.type == 'calculation': has_value = False has_variable = False if hasattr(value, 'param'): for param in value.param: if param.type == 'variable': has_variable = True break #if not has_variable: # # if one parameter is a variable, let variable choice if it's mandatory # variable.mandatory = True if has_value: # if has value but without any calculation variable.mandatory = True if variable.mandatory is True and (not hasattr(variable, 'value') or is_follower): variable.mode = modes_level[0] if variable.mode != None and modes[variable.mode] < modes[family_mode] and (not is_follower or variable.mode != modes_level[0]): variable.mode = family_mode if variable.hidden is True: variable.frozen = True if not variable.auto_save is True and 'force_default_on_freeze' not in vars(variable): variable.force_default_on_freeze = True def change_variable_mode(self): # pylint: disable=C0111 if not hasattr(self.objectspace.space, 'variables'): return for variables in self.objectspace.space.variables.values(): namespace = variables.name if hasattr(variables, 'family'): for family in variables.family.values(): family_mode = family.mode if hasattr(family, 'variable'): for variable in family.variable.values(): if isinstance(variable, self.objectspace.Leadership): mode = modes_level[-1] for idx, follower in enumerate(variable.variable): if follower.auto_save is True: raise DictConsistencyError(_(f'leader/followers {follower.name} could not be auto_save')) if follower.auto_freeze is True: raise DictConsistencyError(_('leader/followers {follower.name} could not be auto_freeze')) is_follower = idx != 0 path = '{}.{}.{}'.format(family.path, variable.name, follower.name) self.annotate_variable(follower, family_mode, path, is_follower) # leader's mode is minimum level if modes[variable.variable[0].mode] > modes[follower.mode]: follower.mode = variable.variable[0].mode variable.mode = variable.variable[0].mode else: # auto_save's variable is set in 'basic' mode if its mode is 'normal' if variable.auto_save is True and variable.mode != modes_level[-1]: variable.mode = modes_level[0] # auto_freeze's variable is set in 'basic' mode if its mode is 'normal' if variable.auto_freeze is True and variable.mode != modes_level[-1]: variable.mode = modes_level[0] path = '{}.{}'.format(family.path, variable.name) self.annotate_variable(variable, family_mode, path) class PropertyAnnotator: def __init__(self, objectspace): self.objectspace = objectspace self.convert_annotator() def convert_property(self, variable, ): properties = [] for prop in PROPERTIES: if hasattr(variable, prop): if getattr(variable, prop) == True: for subprop in CONVERT_PROPERTIES.get(prop, [prop]): properties.append(subprop) setattr(variable, prop, None) if hasattr(variable, 'mode') and variable.mode: properties.append(variable.mode) variable.mode = None if properties: variable.properties = frozenset(properties) def convert_annotator(self): # pylint: disable=C0111 if hasattr(self.objectspace.space, 'services'): self.convert_property(self.objectspace.space.services) for services in self.objectspace.space.services.service.values(): self.convert_property(services) for service in vars(services).values(): if isinstance(service, self.objectspace.family): self.convert_property(service) if hasattr(service, 'family'): self.convert_property(service) for family in service.family: self.convert_property(family) if hasattr(family, 'variable'): for variable in family.variable: self.convert_property(variable) if hasattr(self.objectspace.space, 'variables'): for variables in self.objectspace.space.variables.values(): if hasattr(variables, 'family'): for family in variables.family.values(): self.convert_property(family) if hasattr(family, 'variable'): for variable in family.variable.values(): if isinstance(variable, self.objectspace.Leadership): self.convert_property(variable) for follower in variable.variable: self.convert_property(follower) else: self.convert_property(variable)