From ca40aa5ec3610384582c4edd77fa6cfe955fe8b5 Mon Sep 17 00:00:00 2001 From: Emmanuel Garette Date: Thu, 24 Dec 2020 07:39:51 +0100 Subject: [PATCH] split annotator --- src/rougail/annotator/__init__.py | 23 ++ src/rougail/annotator/constrainte.py | 515 +++++++++++++++++++++++++++ src/rougail/annotator/family.py | 146 ++++++++ src/rougail/annotator/group.py | 135 +++++++ src/rougail/annotator/property.py | 55 +++ src/rougail/annotator/service.py | 251 +++++++++++++ src/rougail/annotator/variable.py | 193 ++++++++++ 7 files changed, 1318 insertions(+) create mode 100644 src/rougail/annotator/__init__.py create mode 100644 src/rougail/annotator/constrainte.py create mode 100644 src/rougail/annotator/family.py create mode 100644 src/rougail/annotator/group.py create mode 100644 src/rougail/annotator/property.py create mode 100644 src/rougail/annotator/service.py create mode 100644 src/rougail/annotator/variable.py diff --git a/src/rougail/annotator/__init__.py b/src/rougail/annotator/__init__.py new file mode 100644 index 00000000..c22fe358 --- /dev/null +++ b/src/rougail/annotator/__init__.py @@ -0,0 +1,23 @@ +from .group import GroupAnnotator +from .service import ServiceAnnotator, ERASED_ATTRIBUTES +from .variable import VariableAnnotator, CONVERT_OPTION +from .constrainte import ConstrainteAnnotator +from .family import FamilyAnnotator, modes +from .property import PropertyAnnotator + +class SpaceAnnotator: + """Transformations applied on a CreoleObjSpace instance + """ + def __init__(self, objectspace, eosfunc_file): + self.objectspace = objectspace + GroupAnnotator(objectspace) + ServiceAnnotator(objectspace) + VariableAnnotator(objectspace) + ConstrainteAnnotator(objectspace, + eosfunc_file, + ) + FamilyAnnotator(objectspace) + PropertyAnnotator(objectspace) + + +__all__ = ('SpaceAnnotator', 'ERASED_ATTRIBUTES', 'CONVERT_OPTION', 'modes') diff --git a/src/rougail/annotator/constrainte.py b/src/rougail/annotator/constrainte.py new file mode 100644 index 00000000..6c81202a --- /dev/null +++ b/src/rougail/annotator/constrainte.py @@ -0,0 +1,515 @@ +from typing import List +from importlib.machinery import SourceFileLoader + +from .variable import CONVERT_OPTION + +from ..i18n import _ +from ..utils import normalize_family +from ..error import DictConsistencyError +INTERNAL_FUNCTIONS = ['valid_enum', 'valid_in_network', 'valid_differ', 'valid_entier'] + + +class ConstrainteAnnotator: + def __init__(self, + objectspace, + eosfunc_file, + ): + if not hasattr(objectspace.space, 'constraints'): + return + self.objectspace = objectspace + eosfunc = SourceFileLoader('eosfunc', eosfunc_file).load_module() + self.functions = dir(eosfunc) + self.functions.extend(INTERNAL_FUNCTIONS) + self.valid_enums = {} + if hasattr(self.objectspace.space.constraints, 'check'): + self.check_check() + self.check_replace_text() + self.check_valid_enum() + self.check_change_warning() + self.convert_check() + if hasattr(self.objectspace.space.constraints, 'condition'): + self.check_params_target() + self.filter_targets() + self.convert_xxxlist_to_variable() + self.check_condition_fallback_optional() + self.check_choice_option_condition() + self.remove_condition_with_empty_target() + self.convert_condition() + if hasattr(self.objectspace.space.constraints, 'fill'): + self.convert_fill() + self.remove_constraints() + + def check_check(self): + remove_indexes = [] + for check_idx, check in enumerate(self.objectspace.space.constraints.check): + if not check.name in self.functions: + xmlfiles = self.objectspace.display_xmlfiles(check.xmlfiles) + raise DictConsistencyError(_(f'cannot find check function "{check.name}" in {xmlfiles}')) + if hasattr(check, 'param'): + param_option_indexes = [] + for idx, param in enumerate(check.param): + if param.type == 'variable' and not self.objectspace.paths.path_is_defined(param.text): + if param.optional is True: + param_option_indexes.append(idx) + else: + xmlfiles = self.objectspace.display_xmlfiles(check.xmlfiles) + raise DictConsistencyError(_(f'cannot find check param "{param.text}" in {xmlfiles}')) + if param.type != 'variable': + param.notraisepropertyerror = None + param_option_indexes = list(set(param_option_indexes)) + param_option_indexes.sort(reverse=True) + for idx in param_option_indexes: + check.param.pop(idx) + if check.param == []: + remove_indexes.append(check_idx) + remove_indexes.sort(reverse=True) + for idx in remove_indexes: + del self.objectspace.space.constraints.check[idx] + + def check_replace_text(self): + for check_idx, check in enumerate(self.objectspace.space.constraints.check): + namespace = check.namespace + if hasattr(check, 'param'): + for idx, param in enumerate(check.param): + if param.type == 'variable': + param.text = self.objectspace.paths.get_variable_path(param.text, namespace) + check.is_in_leadership = self.objectspace.paths.get_leader(check.target) != None + # let's replace the target by the path + check.target = self.objectspace.paths.get_variable_path(check.target, namespace) + + def check_valid_enum(self): + remove_indexes = [] + for idx, check in enumerate(self.objectspace.space.constraints.check): + if check.name == 'valid_enum': + if check.target in self.valid_enums: + old_xmlfiles = self.objectspace.display_xmlfiles(self.valid_enums[check.target]['xmlfiles']) + xmlfiles = self.objectspace.display_xmlfiles(check.xmlfiles) + raise DictConsistencyError(_(f'valid_enum define in {xmlfiles} but already set in {old_xmlfiles} for "{check.target}", did you forget remove_check?')) + if not hasattr(check, 'param'): + raise DictConsistencyError(_(f'param is mandatory for a valid_enum of variable {check.target}')) + variable = self.objectspace.paths.get_variable_obj(check.target) + values = self.load_params_in_valid_enum(check.param, + variable.name, + variable.type, + ) + self._set_valid_enum(variable, + values, + variable.type, + check.target, + check.xmlfiles, + ) + remove_indexes.append(idx) + remove_indexes.sort(reverse=True) + for idx in remove_indexes: + del self.objectspace.space.constraints.check[idx] + + def load_params_in_valid_enum(self, + params, + variable_name, + variable_type, + ): + has_variable = None + values = [] + for param in params: + if param.type == 'variable': + if has_variable is not None: + raise DictConsistencyError(_(f'only one "variable" parameter is allowed for valid_enum of variable {variable_name}')) + has_variable = True + variable = self.objectspace.paths.get_variable_obj(param.text) + if not variable.multi: + raise DictConsistencyError(_(f'only multi "variable" parameter is allowed for valid_enum of variable {variable_name}')) + values = param.text + else: + if has_variable: + raise DictConsistencyError(_(f'only one "variable" parameter is allowed for valid_enum of variable {variable_name}')) + if not hasattr(param, 'text'): + if param.type == 'number': + raise DictConsistencyError(_(f'value is mandatory for valid_enum of variable {variable_name}')) + values.append(None) + else: + values.append(param.text) + return values + + def check_change_warning(self): + #convert level to "warnings_only" + for check in self.objectspace.space.constraints.check: + if check.level == 'warning': + check.warnings_only = True + else: + check.warnings_only = False + check.level = None + + def _get_family_variables_from_target(self, + target, + ): + if target.type == 'variable': + variable = self.objectspace.paths.get_variable_obj(target.name) + family = self.objectspace.paths.get_family_obj(target.name.rsplit('.', 1)[0]) + # it's a leader, so apply property to leadership + if isinstance(family, self.objectspace.leadership) and family.name == variable.name: + return family, family.variable + return variable, [variable] + # it's a family + variable = self.objectspace.paths.get_family_obj(target.name) + return variable, list(variable.variable.values()) + + def check_params_target(self): + for condition in self.objectspace.space.constraints.condition: + if not hasattr(condition, 'target'): + raise DictConsistencyError(_('target is mandatory in condition')) + for target in condition.target: + if target.type.endswith('list') and condition.name not in ['disabled_if_in', 'disabled_if_not_in']: + raise DictConsistencyError(_(f'target in condition for {target.type} not allow in {condition.name}')) + + def filter_targets(self): # pylint: disable=C0111 + for condition_idx, condition in enumerate(self.objectspace.space.constraints.condition): + namespace = condition.namespace + for idx, target in enumerate(condition.target): + if target.type == 'variable': + if condition.source == target.name: + raise DictConsistencyError(_('target name and source name must be different: {}').format(condition.source)) + try: + target_names = [normalize_family(name) for name in target.name.split('.')] + target.name = self.objectspace.paths.get_variable_path('.'.join(target_names), namespace) + except DictConsistencyError: + # for optional variable + pass + elif target.type == 'family': + try: + target_names = [normalize_family(name) for name in target.name.split('.')] + target.name = self.objectspace.paths.get_family_path('.'.join(target_names), namespace) + except KeyError: + raise DictConsistencyError(_('cannot found family {}').format(target.name)) + + def convert_xxxlist_to_variable(self): # pylint: disable=C0111 + # transform *list to variable or family + for condition_idx, condition in enumerate(self.objectspace.space.constraints.condition): + new_targets = [] + remove_targets = [] + for target_idx, target in enumerate(condition.target): + if target.type.endswith('list'): + listname = target.type + listvars = self.objectspace.list_conditions.get(listname, + {}).get(target.name) + if listvars: + for listvar in listvars: + variable = self.objectspace.paths.get_variable_obj(listvar) + type_ = 'variable' + new_target = self.objectspace.target(variable.xmlfiles) + new_target.type = type_ + new_target.name = listvar + new_target.index = target.index + new_targets.append(new_target) + remove_targets.append(target_idx) + remove_targets.sort(reverse=True) + for target_idx in remove_targets: + condition.target.pop(target_idx) + condition.target.extend(new_targets) + + def check_condition_fallback_optional(self): + # a condition with a fallback **and** the source variable doesn't exist + remove_conditions = [] + for idx, condition in enumerate(self.objectspace.space.constraints.condition): + # fallback + if condition.fallback is True and not self.objectspace.paths.path_is_defined(condition.source): + apply_action = False + if condition.name in ['disabled_if_in', 'mandatory_if_in', 'hidden_if_in']: + apply_action = not condition.force_condition_on_fallback + else: + apply_action = condition.force_inverse_condition_on_fallback + if apply_action: + actions = self._get_condition_actions(condition.name) + for target in condition.target: + leader_or_variable, variables = self._get_family_variables_from_target(target) + for action_idx, action in enumerate(actions): + if action_idx == 0: + setattr(leader_or_variable, action, True) + else: + for variable in variables: + setattr(variable, action, True) + remove_conditions.append(idx) + continue + + remove_targets = [] + # optional + for idx, target in enumerate(condition.target): + if target.optional is True and not self.objectspace.paths.path_is_defined(target.name): + remove_targets.append(idx) + remove_targets = list(set(remove_targets)) + remove_targets.sort(reverse=True) + for idx in remove_targets: + condition.target.pop(idx) + remove_conditions = list(set(remove_conditions)) + remove_conditions.sort(reverse=True) + for idx in remove_conditions: + self.objectspace.space.constraints.condition.pop(idx) + + def _get_condition_actions(self, condition_name): + if condition_name.startswith('disabled_if_'): + return ['disabled'] + elif condition_name.startswith('hidden_if_'): + return ['hidden', 'frozen', 'force_default_on_freeze'] + elif condition_name.startswith('mandatory_if_'): + return ['mandatory'] + elif condition_name == 'auto_hidden_if_not_in': + return ['auto_frozen'] + + def check_choice_option_condition(self): + # remove condition for ChoiceOption that don't have param + remove_conditions = [] + for condition_idx, condition in enumerate(self.objectspace.space.constraints.condition): + namespace = condition.namespace + condition.source = self.objectspace.paths.get_variable_path(condition.source, namespace, allow_source=True) + src_variable = self.objectspace.paths.get_variable_obj(condition.source) + valid_enum = None + if condition.source in self.valid_enums and self.valid_enums[condition.source]['type'] == 'string': + valid_enum = self.valid_enums[condition.source]['values'] + if valid_enum is not None: + remove_param = [] + for param_idx, param in enumerate(condition.param): + if param.text not in valid_enum: + remove_param.append(param_idx) + remove_param.sort(reverse=True) + for idx in remove_param: + del condition.param[idx] + if condition.param == []: + remove_targets = [] + for target in condition.target: + leader_or_variable, variables = self._get_family_variables_from_target(target) + if condition.name == 'disabled_if_not_in': + leader_or_variable.disabled = True + elif condition.name == 'hidden_if_not_in': + leader_or_variable.hidden = True + for variable in variables: + variable.frozen = True + variable.force_default_on_freeze = True + elif condition.name == 'mandatory_if_not_in': + variable.mandatory = True + remove_targets = list(set(remove_targets)) + remove_targets.sort(reverse=True) + for target_idx in remove_targets: + condition.target.pop(target_idx) + remove_conditions.append(condition_idx) + remove_conditions = list(set(remove_conditions)) + remove_conditions.sort(reverse=True) + for idx in remove_conditions: + self.objectspace.space.constraints.condition.pop(idx) + + def remove_condition_with_empty_target(self): + remove_conditions = [] + for condition_idx, condition in enumerate(self.objectspace.space.constraints.condition): + if not condition.target: + remove_conditions.append(condition_idx) + remove_conditions = list(set(remove_conditions)) + remove_conditions.sort(reverse=True) + for idx in remove_conditions: + self.objectspace.space.constraints.condition.pop(idx) + + def convert_condition(self): + for condition in self.objectspace.space.constraints.condition: + inverse = condition.name.endswith('_if_not_in') + actions = self._get_condition_actions(condition.name) + for param in condition.param: + text = getattr(param, 'text', None) + for target in condition.target: + leader_or_variable, variables = self._get_family_variables_from_target(target) + # if option is already disable, do not apply disable_if_in + # check only the first action (example of multiple actions: 'hidden', 'frozen', 'force_default_on_freeze') + main_action = actions[0] + if getattr(leader_or_variable, main_action, False) is True: + continue + for idx, action in enumerate(actions): + prop = self.objectspace.property_(leader_or_variable.xmlfiles) + prop.type = 'calculation' + prop.inverse = inverse + prop.source = condition.source + prop.expected = text + prop.name = action + if idx == 0: + # main action is for the variable or family + if not hasattr(leader_or_variable, 'property'): + leader_or_variable.property = [] + leader_or_variable.property.append(prop) + else: + # other actions are set to the variable or children of family + for variable in variables: + if not hasattr(variable, 'property'): + variable.property = [] + variable.property.append(prop) + del self.objectspace.space.constraints.condition + + def _set_valid_enum(self, + variable, + values, + type_, + target: str, + xmlfiles: List[str], + ): + # value for choice's variable is mandatory + variable.mandatory = True + # build choice + variable.choice = [] + if isinstance(values, str): + choice = self.objectspace.choice() + choice.type = 'calculation' + choice.name = values + variable.choice.append(choice) + else: + self.valid_enums[target] = {'type': type_, + 'values': values, + 'xmlfiles': xmlfiles, + } + choices = [] + for value in values: + choice = self.objectspace.choice(variable.xmlfiles) + try: + if value is not None: + choice.name = CONVERT_OPTION[type_].get('func', str)(value) + else: + choice.name = value + except: + raise DictConsistencyError(_(f'unable to change type of a valid_enum entry "{value}" is not a valid "{type_}" for "{variable.name}"')) + if choice.name == '': + choice.name = None + choices.append(choice.name) + choice.type = type_ + variable.choice.append(choice) + # check value or set first choice value has default value + if hasattr(variable, 'value'): + for value in variable.value: + value.type = type_ + try: + cvalue = CONVERT_OPTION[type_].get('func', str)(value.name) + except: + raise DictConsistencyError(_(f'unable to change type of value "{value}" is not a valid "{type_}" for "{variable.name}"')) + if cvalue not in choices: + raise DictConsistencyError(_('value "{}" of variable "{}" is not in list of all expected values ({})').format(value.name, variable.name, choices)) + else: + new_value = self.objectspace.value(variable.xmlfiles) + new_value.name = choices[0] + new_value.type = type_ + variable.value = [new_value] + if not variable.choice: + raise DictConsistencyError(_('empty valid enum is not allowed for variable {}').format(variable.name)) + variable.type = 'choice' + + def convert_check(self): + for check in self.objectspace.space.constraints.check: + variable = self.objectspace.paths.get_variable_obj(check.target) + name = check.name + if name == 'valid_entier': + if not hasattr(check, 'param'): + raise DictConsistencyError(_('{} must have, at least, 1 param').format(name)) + for param in check.param: + if param.type not in ['string', 'number']: + raise DictConsistencyError(_(f'param in "valid_entier" must not be a "{param.type}"')) + if param.name == 'mini': + variable.min_number = int(param.text) + elif param.name == 'maxi': + variable.max_number = int(param.text) + else: + raise DictConsistencyError(_(f'unknown parameter {param.text} in check "valid_entier" for variable {check.target}')) + else: + check_ = self.objectspace.check(variable.xmlfiles) + if name == 'valid_differ': + name = 'valid_not_equal' + elif name == 'valid_network_netmask': + params_len = 1 + if len(check.param) != params_len: + xmlfiles = self.objectspace.display_xmlfiles(check.xmlfiles) + raise DictConsistencyError(_(f'{name} must have {params_len} param in {xmlfiles}')) + elif name == 'valid_ipnetmask': + params_len = 1 + if len(check.param) != params_len: + xmlfiles = self.objectspace.display_xmlfiles(check.xmlfiles) + raise DictConsistencyError(_(f'{name} must have {params_len} param in {xmlfiles}')) + name = 'valid_ip_netmask' + elif name == 'valid_broadcast': + params_len = 2 + if len(check.param) != params_len: + xmlfiles = self.objectspace.display_xmlfiles(check.xmlfiles) + raise DictConsistencyError(_(f'{name} must have {params_len} param in {xmlfiles}')) + elif name == 'valid_in_network': + if len(check.param) not in (1, 2): + params_len = 2 + xmlfiles = self.objectspace.display_xmlfiles(check.xmlfiles) + raise DictConsistencyError(_(f'{name} must have {params_len} param in {xmlfiles}')) + check_.name = name + check_.warnings_only = check.warnings_only + if hasattr(check, 'param'): + check_.param = check.param + if not hasattr(variable, 'check'): + variable.check = [] + variable.check.append(check_) + del self.objectspace.space.constraints.check + + def convert_fill(self): # pylint: disable=C0111,R0912 + # sort fill/auto by index + fills = {fill.index: fill for idx, fill in enumerate(self.objectspace.space.constraints.fill)} + indexes = list(fills.keys()) + indexes.sort() + targets = [] + for idx in indexes: + fill = fills[idx] + # test if it's redefined calculation + if fill.target in targets: + xmlfiles = self.objectspace.display_xmlfiles(fill.xmlfiles) + raise DictConsistencyError(_(f'A fill already exists for the target of "{fill.target}" created in {xmlfiles}')) + targets.append(fill.target) + # + if fill.name not in self.functions: + raise DictConsistencyError(_('cannot find fill function {}').format(fill.name)) + + namespace = fill.namespace + # let's replace the target by the path + fill.target, suffix = self.objectspace.paths.get_variable_path(fill.target, + namespace, + with_suffix=True, + ) + if suffix is not None: + raise DictConsistencyError(_(f'Cannot add fill function to "{fill.target}" only with the suffix "{suffix}"')) + variable = self.objectspace.paths.get_variable_obj(fill.target) + value = self.objectspace.value(variable.xmlfiles) + value.type = 'calculation' + value.name = fill.name + if hasattr(fill, 'param'): + param_to_delete = [] + for fill_idx, param in enumerate(fill.param): + if param.type not in ['suffix', 'string'] and not hasattr(param, 'text'): + raise DictConsistencyError(_(f"All '{param.type}' variables must have a value in order to calculate {fill.target}")) + if param.type == 'suffix' and hasattr(param, 'text'): + raise DictConsistencyError(_(f"All '{param.type}' variables must not have a value in order to calculate {fill.target}")) + if param.type == 'string': + if not hasattr(param, 'text'): + param.text = None + if param.type == 'variable': + try: + param.text, suffix = self.objectspace.paths.get_variable_path(param.text, + namespace, + with_suffix=True, + ) + if suffix: + param.suffix = suffix + except DictConsistencyError as err: + if param.optional is False: + raise err + param_to_delete.append(fill_idx) + continue + else: + param.notraisepropertyerror = None + param_to_delete.sort(reverse=True) + for param_idx in param_to_delete: + fill.param.pop(param_idx) + value.param = fill.param + variable.value = [value] + del self.objectspace.space.constraints.fill + + def remove_constraints(self): + if hasattr(self.objectspace.space.constraints, 'index'): + del self.objectspace.space.constraints.index + del self.objectspace.space.constraints.namespace + del self.objectspace.space.constraints.xmlfiles + if vars(self.objectspace.space.constraints): + raise Exception('constraints again?') + del self.objectspace.space.constraints + diff --git a/src/rougail/annotator/family.py b/src/rougail/annotator/family.py new file mode 100644 index 00000000..78361e00 --- /dev/null +++ b/src/rougail/annotator/family.py @@ -0,0 +1,146 @@ +from ..i18n import _ +from ..error import DictConsistencyError + + +#mode order is important +modes_level = ('basic', 'normal', 'expert') + + +class Mode(object): + def __init__(self, name, level): + self.name = name + self.level = level + def __gt__(self, other): + return other.level < self.level + + +def mode_factory(): + mode_obj = {} + for idx in range(len(modes_level)): + name = modes_level[idx] + mode_obj[name] = Mode(name, idx) + return mode_obj + +modes = mode_factory() + + +class FamilyAnnotator: + def __init__(self, + objectspace, + ): + self.objectspace = objectspace + self.remove_empty_families() + self.change_variable_mode() + self.change_family_mode() + self.dynamic_families() + + def remove_empty_families(self): # pylint: disable=C0111,R0201 + if hasattr(self.objectspace.space, 'variables'): + for family in self.objectspace.space.variables.values(): + if hasattr(family, 'family'): + space = family.family + removed_families = [] + for family_name, family in space.items(): + if not hasattr(family, 'variable') or len(family.variable) == 0: + removed_families.append(family_name) + for family_name in removed_families: + del space[family_name] + + def change_family_mode(self): # pylint: disable=C0111 + if not hasattr(self.objectspace.space, 'variables'): + return + for family in self.objectspace.space.variables.values(): + if hasattr(family, 'family'): + for family in family.family.values(): + mode = modes_level[-1] + for variable in family.variable.values(): + if isinstance(variable, self.objectspace.leadership): + variable_mode = variable.variable[0].mode + variable.variable[0].mode = None + variable.mode = variable_mode + else: + variable_mode = variable.mode + if variable_mode is not None and modes[mode] > modes[variable_mode]: + mode = variable_mode + family.mode = mode + + def dynamic_families(self): # pylint: disable=C0111 + if not hasattr(self.objectspace.space, 'variables'): + return + for family in self.objectspace.space.variables.values(): + if hasattr(family, 'family'): + for family in family.family.values(): + if 'dynamic' in vars(family): + namespace = self.objectspace.paths.get_variable_namespace(family.dynamic) + varpath = self.objectspace.paths.get_variable_path(family.dynamic, namespace) + family.dynamic = varpath + + def annotate_variable(self, variable, family_mode, path, is_follower=False): + # if the variable is mandatory and doesn't have any value + # then the variable's mode is set to 'basic' + if not hasattr(variable, 'value') and variable.type == 'boolean': + new_value = self.objectspace.value(variable.xmlfiles) + new_value.name = True + new_value.type = 'boolean' + variable.value = [new_value] + if hasattr(variable, 'value') and variable.value: + has_value = True + for value in variable.value: + if value.type == 'calculation': + has_value = False + has_variable = False + if hasattr(value, 'param'): + for param in value.param: + if param.type == 'variable': + has_variable = True + break + #if not has_variable: + # # if one parameter is a variable, let variable choice if it's mandatory + # variable.mandatory = True + if has_value: + # if has value but without any calculation + variable.mandatory = True + if variable.mandatory is True and (not hasattr(variable, 'value') or is_follower): + variable.mode = modes_level[0] + if variable.mode != None and modes[variable.mode] < modes[family_mode] and (not is_follower or variable.mode != modes_level[0]): + variable.mode = family_mode + if variable.hidden is True: + variable.frozen = True + if not variable.auto_save is True and 'force_default_on_freeze' not in vars(variable): + variable.force_default_on_freeze = True + + def change_variable_mode(self): # pylint: disable=C0111 + if not hasattr(self.objectspace.space, 'variables'): + return + for variables in self.objectspace.space.variables.values(): + namespace = variables.name + if hasattr(variables, 'family'): + for family in variables.family.values(): + family_mode = family.mode + if hasattr(family, 'variable'): + for variable in family.variable.values(): + + if isinstance(variable, self.objectspace.leadership): + mode = modes_level[-1] + for idx, follower in enumerate(variable.variable): + if follower.auto_save is True: + raise DictConsistencyError(_(f'leader/followers {follower.name} could not be auto_save')) + if follower.auto_freeze is True: + raise DictConsistencyError(_('leader/followers {follower.name} could not be auto_freeze')) + is_follower = idx != 0 + path = '{}.{}.{}'.format(family.path, variable.name, follower.name) + self.annotate_variable(follower, family_mode, path, is_follower) + # leader's mode is minimum level + if modes[variable.variable[0].mode] > modes[follower.mode]: + follower.mode = variable.variable[0].mode + variable.mode = variable.variable[0].mode + else: + # auto_save's variable is set in 'basic' mode if its mode is 'normal' + if variable.auto_save is True and variable.mode != modes_level[-1]: + variable.mode = modes_level[0] + # auto_freeze's variable is set in 'basic' mode if its mode is 'normal' + if variable.auto_freeze is True and variable.mode != modes_level[-1]: + variable.mode = modes_level[0] + path = '{}.{}'.format(family.path, variable.name) + self.annotate_variable(variable, family_mode, path) + diff --git a/src/rougail/annotator/group.py b/src/rougail/annotator/group.py new file mode 100644 index 00000000..532ba9f3 --- /dev/null +++ b/src/rougail/annotator/group.py @@ -0,0 +1,135 @@ +from typing import List + +from ..i18n import _ +from ..error import DictConsistencyError + + +class GroupAnnotator: + def __init__(self, + objectspace, + ): + self.objectspace = objectspace + if not hasattr(self.objectspace.space, 'constraints') or not hasattr(self.objectspace.space.constraints, 'group'): + return + self.convert_groups() + + def convert_groups(self): # pylint: disable=C0111 + for group in self.objectspace.space.constraints.group: + leader_fullname = group.leader + leader_family_name = self.objectspace.paths.get_variable_family_name(leader_fullname) + leader_name = self.objectspace.paths.get_variable_name(leader_fullname) + namespace = self.objectspace.paths.get_variable_namespace(leader_fullname) + if '.' not in leader_fullname: + leader_fullname = '.'.join([namespace, leader_family_name, leader_fullname]) + follower_names = list(group.follower.keys()) + has_a_leader = False + ori_leader_family = self.objectspace.paths.get_family_obj(leader_fullname.rsplit('.', 1)[0]) + for variable in list(ori_leader_family.variable.values()): + if has_a_leader: + # it's a follower + self.manage_follower(namespace, + leader_family_name, + variable, + leadership_name, + follower_names, + leader_space, + leader_is_hidden, + ) + ori_leader_family.variable.pop(variable.name) + if follower_names == []: + # no more follower + break + elif variable.name == leader_name: + # it's a leader + if isinstance(variable, self.objectspace.leadership): + # append follower to an existed leadership + leader_space = variable + # if variable.hidden: + # leader_is_hidden = True + else: + leader_space = self.objectspace.leadership(variable.xmlfiles) + if hasattr(group, 'name'): + leadership_name = group.name + else: + leadership_name = leader_name + leader_is_hidden = self.manage_leader(leader_space, + leader_family_name, + leadership_name, + leader_name, + namespace, + variable, + group, + leader_fullname, + ) + has_a_leader = True + else: + raise DictConsistencyError(_('cannot found followers "{}"').format('", "'.join(follower_names))) + del self.objectspace.space.constraints.group + + def manage_leader(self, + leader_space: 'Leadership', + leader_family_name: str, + leadership_name: str, + leader_name: str, + namespace: str, + variable: 'Variable', + group: 'Group', + leader_fullname: str, + ) -> None: + # manage leader's variable + if variable.multi is not True: + raise DictConsistencyError(_('the variable {} in a group must be multi').format(variable.name)) + leader_space.variable = [] + leader_space.name = leadership_name + leader_space.hidden = variable.hidden + if variable.hidden: + leader_is_hidden = True + variable.frozen = True + variable.force_default_on_freeze = True + else: + leader_is_hidden = False + variable.hidden = None + if hasattr(group, 'description'): + leader_space.doc = group.description + elif hasattr(variable, 'description'): + leader_space.doc = variable.description + else: + leader_space.doc = variable.name + leadership_path = namespace + '.' + leader_family_name + '.' + leadership_name + self.objectspace.paths.add_family(namespace, + leadership_path, + leader_space, + ) + leader_family = self.objectspace.space.variables[namespace].family[leader_family_name] + leader_family.variable[leader_name] = leader_space + leader_space.variable.append(variable) + self.objectspace.paths.set_leader(namespace, + leader_family_name, + leader_name, + leader_name, + ) + leader_space.path = leader_fullname + return leader_is_hidden + + def manage_follower(self, + namespace: str, + leader_family_name: str, + variable: 'Variable', + leader_name: str, + follower_names: List[str], + leader_space: 'Leadership', + leader_is_hidden: bool, + ) -> None: + if variable.name != follower_names[0]: + raise DictConsistencyError(_('cannot found this follower {}').format(follower_names[0])) + follower_names.remove(variable.name) + if leader_is_hidden: + variable.frozen = True + variable.force_default_on_freeze = True + leader_space.variable.append(variable) # pylint: disable=E1101 + self.objectspace.paths.set_leader(namespace, + leader_family_name, + variable.name, + leader_name, + ) + diff --git a/src/rougail/annotator/property.py b/src/rougail/annotator/property.py new file mode 100644 index 00000000..741538e3 --- /dev/null +++ b/src/rougail/annotator/property.py @@ -0,0 +1,55 @@ +PROPERTIES = ('hidden', 'frozen', 'auto_freeze', 'auto_save', 'force_default_on_freeze', + 'force_store_value', 'disabled', 'mandatory') +CONVERT_PROPERTIES = {'auto_save': ['force_store_value'], 'auto_freeze': ['force_store_value', 'auto_freeze']} + + +class PropertyAnnotator: + def __init__(self, objectspace): + self.objectspace = objectspace + self.convert_annotator() + + def convert_property(self, + variable, + ): + properties = [] + for prop in PROPERTIES: + if hasattr(variable, prop): + if getattr(variable, prop) == True: + for subprop in CONVERT_PROPERTIES.get(prop, [prop]): + properties.append(subprop) + setattr(variable, prop, None) + if hasattr(variable, 'mode') and variable.mode: + properties.append(variable.mode) + variable.mode = None + if properties: + variable.properties = frozenset(properties) + + def convert_annotator(self): # pylint: disable=C0111 + if hasattr(self.objectspace.space, 'services'): + self.convert_property(self.objectspace.space.services) + for services in self.objectspace.space.services.service.values(): + self.convert_property(services) + for service in vars(services).values(): + if isinstance(service, self.objectspace.family): + self.convert_property(service) + if hasattr(service, 'family'): + self.convert_property(service) + for family in service.family: + self.convert_property(family) + if hasattr(family, 'variable'): + for variable in family.variable: + self.convert_property(variable) + if hasattr(self.objectspace.space, 'variables'): + for variables in self.objectspace.space.variables.values(): + if hasattr(variables, 'family'): + for family in variables.family.values(): + self.convert_property(family) + if hasattr(family, 'variable'): + for variable in family.variable.values(): + if isinstance(variable, self.objectspace.leadership): + self.convert_property(variable) + for follower in variable.variable: + self.convert_property(follower) + else: + self.convert_property(variable) + diff --git a/src/rougail/annotator/service.py b/src/rougail/annotator/service.py new file mode 100644 index 00000000..e4e72116 --- /dev/null +++ b/src/rougail/annotator/service.py @@ -0,0 +1,251 @@ +from os.path import basename + +from ..i18n import _ +from ..utils import normalize_family +from ..error import DictConsistencyError +# a CreoleObjSpace's attribute has some annotations +# that shall not be present in the exported (flatened) XML +ERASED_ATTRIBUTES = ('redefine', 'exists', 'fallback', 'optional', 'remove_check', 'namespace', + 'remove_condition', 'path', 'instance_mode', 'index', 'is_in_leadership', + 'level', 'remove_fill', 'xmlfiles') + + +KEY_TYPE = {'variable': 'symlink', + 'SymLinkOption': 'symlink', + 'PortOption': 'port', + 'UnicodeOption': 'string', + 'NetworkOption': 'network', + 'NetmaskOption': 'netmask', + 'URLOption': 'web_address', + 'FilenameOption': 'filename'} + + +class ServiceAnnotator: + """Manage service's object + for example:: + + + + 123 + + + + """ + def __init__(self, objectspace): + self.objectspace = objectspace + self.convert_services() + + def convert_services(self): + if not hasattr(self.objectspace.space, 'services'): + return + if not hasattr(self.objectspace.space.services, 'service'): + del self.objectspace.space.services + return + self.objectspace.space.services.hidden = True + self.objectspace.space.services.name = 'services' + self.objectspace.space.services.doc = 'services' + families = {} + for idx, service_name in enumerate(self.objectspace.space.services.service.keys()): + service = self.objectspace.space.services.service[service_name] + new_service = self.objectspace.service(service.xmlfiles) + for elttype, values in vars(service).items(): + if not isinstance(values, (dict, list)) or elttype in ERASED_ATTRIBUTES: + setattr(new_service, elttype, values) + continue + eltname = elttype + 's' + path = '.'.join(['services', service_name, eltname]) + family = self.gen_family(eltname, + path, + service.xmlfiles, + ) + if isinstance(values, dict): + values = list(values.values()) + family.family = self.make_group_from_elts(service_name, + elttype, + values, + path, + ) + setattr(new_service, elttype, family) + new_service.doc = new_service.name + families[service_name] = new_service + self.objectspace.space.services.service = families + + def gen_family(self, + name, + path, + xmlfiles + ): + family = self.objectspace.family(xmlfiles) + family.name = normalize_family(name) + family.doc = name + family.mode = None + self.objectspace.paths.add_family('services', + path, + family, + ) + return family + + def make_group_from_elts(self, + service_name, + name, + elts, + path, + ): + """Splits each objects into a group (and `OptionDescription`, in tiramisu terms) + and build elements and its attributes (the `Options` in tiramisu terms) + """ + families = [] + new_elts = self._reorder_elts(name, + elts, + ) + for index, elt_info in enumerate(new_elts): + elt = elt_info['elt'] + elt_name = elt_info['elt_name'] + + # try to launch _update_xxxx() function + update_elt = '_update_' + elt_name + if hasattr(self, update_elt): + getattr(self, update_elt)(elt, + index, + path, + service_name, + ) + + idx = 0 + while True: + if hasattr(elt, 'source'): + c_name = elt.source + else: + c_name = elt.name + if idx: + c_name += f'_{idx}' + subpath = '{}.{}'.format(path, c_name) + if not self.objectspace.paths.family_is_defined(subpath): + break + idx += 1 + family = self.gen_family(c_name, + subpath, + elt.xmlfiles, + ) + family.variable = [] + listname = '{}list'.format(name) + activate_path = '.'.join([subpath, 'activate']) + for key in dir(elt): + if key.startswith('_') or key.endswith('_type') or key in ERASED_ATTRIBUTES: + continue + value = getattr(elt, key) + if key == listname: + self.objectspace.list_conditions.setdefault(listname, + {}).setdefault( + value, + []).append(activate_path) + continue + family.variable.append(self._generate_element(elt_name, + key, + value, + elt, + f'{subpath}.{key}' + )) + # FIXME ne devrait pas etre True par défaut + # devrait etre un calcule + family.variable.append(self._generate_element(elt_name, + 'activate', + True, + elt, + activate_path, + )) + families.append(family) + return families + + def _generate_element(self, + elt_name, + key, + value, + elt, + path, + ): + variable = self.objectspace.variable(elt.xmlfiles) + variable.name = normalize_family(key) + variable.mode = None + if key == 'name': + true_key = elt_name + else: + true_key = key + dtd_key_type = true_key + '_type' + if key == 'activate': + type_ = 'boolean' + elif hasattr(elt, dtd_key_type): + type_ = KEY_TYPE[getattr(elt, dtd_key_type)] + elif key in self.objectspace.booleans_attributs: + type_ = 'boolean' + else: + type_ = 'string' + variable.type = type_ + if type_ == 'symlink': + variable.opt = self.objectspace.paths.get_variable_path(value, + 'services', + ) +# variable.opt = value + variable.multi = None + else: + variable.doc = key + val = self.objectspace.value(elt.xmlfiles) + val.type = type_ + val.name = value + variable.value = [val] + self.objectspace.paths.add_variable('services', + path, + 'service', + False, + variable, + ) + return variable + + def _reorder_elts(self, + name, + elts, + ): + """Reorders by index the elts + """ + new_elts = {} + # reorder elts by index + for idx, elt in enumerate(elts): + new_elts.setdefault(idx, []).append(elt) + idxes = list(new_elts.keys()) + idxes.sort() + result_elts = [] + for idx in idxes: + for elt in new_elts[idx]: + result_elts.append({'elt_name': name, 'elt': elt}) + return result_elts + + def _update_override(self, + file_, + index, + service_path, + service_name, + ): + file_.name = f'/systemd/system/{service_name}.service.d/rougail.conf' + # retrieve default value from File object + for attr in ['owner', 'group', 'mode']: + setattr(file_, attr, getattr(self.objectspace.file, attr)) + if not hasattr(file_, 'source'): + file_.source = f'{service_name}.service' + self._update_file(file_, + index, + service_path, + service_name, + ) + + def _update_file(self, + file_, + index, + service_path, + service_name, + ): + if not hasattr(file_, 'file_type') or file_.file_type == "UnicodeOption": + if not hasattr(file_, 'source'): + file_.source = basename(file_.name) + elif not hasattr(file_, 'source'): + raise DictConsistencyError(_('attribute source mandatory for file with variable name for {}').format(file_.name)) + diff --git a/src/rougail/annotator/variable.py b/src/rougail/annotator/variable.py new file mode 100644 index 00000000..69e8b512 --- /dev/null +++ b/src/rougail/annotator/variable.py @@ -0,0 +1,193 @@ +from ..i18n import _ +from ..utils import normalize_family +from ..error import DictConsistencyError + + +CONVERT_OPTION = {'number': dict(opttype="IntOption", func=int), + 'float': dict(opttype="FloatOption", func=float), + 'choice': dict(opttype="ChoiceOption"), + 'string': dict(opttype="StrOption"), + 'password': dict(opttype="PasswordOption"), + 'mail': dict(opttype="EmailOption"), + 'boolean': dict(opttype="BoolOption"), + 'symlink': dict(opttype="SymLinkOption"), + 'filename': dict(opttype="FilenameOption"), + 'date': dict(opttype="DateOption"), + 'unix_user': dict(opttype="UsernameOption"), + 'ip': dict(opttype="IPOption", initkwargs={'allow_reserved': True}), + 'local_ip': dict(opttype="IPOption", initkwargs={'private_only': True, 'warnings_only': True}), + 'netmask': dict(opttype="NetmaskOption"), + 'network': dict(opttype="NetworkOption"), + 'broadcast': dict(opttype="BroadcastOption"), + 'netbios': dict(opttype="DomainnameOption", initkwargs={'type': 'netbios', 'warnings_only': True}), + 'domain': dict(opttype="DomainnameOption", initkwargs={'type': 'domainname', 'allow_ip': False}), + 'hostname': dict(opttype="DomainnameOption", initkwargs={'type': 'hostname', 'allow_ip': False}), + 'web_address': dict(opttype="URLOption", initkwargs={'allow_ip': True, 'allow_without_dot': True}), + 'port': dict(opttype="PortOption", initkwargs={'allow_private': True}), + 'mac': dict(opttype="MACOption"), + 'cidr': dict(opttype="IPOption", initkwargs={'cidr': True}), + 'network_cidr': dict(opttype="NetworkOption", initkwargs={'cidr': True}), + } + + +FORCE_CHOICE = {'oui/non': ['oui', 'non'], + 'on/off': ['on', 'off'], + 'yes/no': ['yes', 'no'], + 'schedule': ['none', 'daily', 'weekly', 'monthly'], + 'schedulemod': ['pre', 'post']} + + +FREEZE_AUTOFREEZE_VARIABLE = 'module_instancie' + + +RENAME_ATTIBUTES = {'description': 'doc'} + + +class VariableAnnotator: + def __init__(self, + objectspace, + ): + self.objectspace = objectspace + self.convert_variable() + self.convert_auto_freeze() + self.convert_separators() + + def convert_variable(self): + def _convert_variable(variable, + variable_type, + ): + if not hasattr(variable, 'type'): + variable.type = 'string' + if variable.type != 'symlink' and not hasattr(variable, 'description'): + variable.description = variable.name + if hasattr(variable, 'value'): + for value in variable.value: + if not hasattr(value, 'type'): + value.type = variable.type + value.name = CONVERT_OPTION.get(value.type, {}).get('func', str)(value.name) + for key, value in RENAME_ATTIBUTES.items(): + setattr(variable, value, getattr(variable, key)) + setattr(variable, key, None) + if variable_type == 'follower': + if variable.multi is True: + variable.multi = 'submulti' + else: + variable.multi = True + + def _convert_valid_enum(namespace, + variable, + path, + ): + if variable.type in FORCE_CHOICE: + check = self.objectspace.check(variable.xmlfiles) + check.name = 'valid_enum' + check.target = path + check.namespace = namespace + check.param = [] + for value in FORCE_CHOICE[variable.type]: + param = self.objectspace.param(variable.xmlfiles) + param.text = value + check.param.append(param) + if not hasattr(self.objectspace.space, 'constraints'): + self.objectspace.space.constraints = self.objectspace.constraints(variable.xmlfiles) + self.objectspace.space.constraints.namespace = namespace + if not hasattr(self.objectspace.space.constraints, 'check'): + self.objectspace.space.constraints.check = [] + self.objectspace.space.constraints.check.append(check) + variable.type = 'string' + + def _valid_type(variable): + if variable.type not in CONVERT_OPTION: + xmlfiles = self.objectspace.display_xmlfiles(variable.xmlfiles) + raise DictConsistencyError(_(f'unvalid type "{variable.type}" for variable "{variable.name}" in {xmlfiles}')) + + if not hasattr(self.objectspace.space, 'variables'): + return + for families in self.objectspace.space.variables.values(): + namespace = families.name + if hasattr(families, 'family'): + families.doc = families.name + for family in families.family.values(): + family.doc = family.name + for key, value in RENAME_ATTIBUTES.items(): + if hasattr(family, key): + setattr(family, value, getattr(family, key)) + setattr(family, key, None) + family.name = normalize_family(family.name) + if hasattr(family, 'variable'): + for variable in family.variable.values(): + if isinstance(variable, self.objectspace.leadership): + for idx, follower in enumerate(variable.variable): + if idx == 0: + variable_type = 'master' + else: + variable_type = 'follower' + path = '{}.{}.{}.{}'.format(namespace, normalize_family(family.name), variable.name, follower.name) + _convert_variable(follower, + variable_type, + ) + _convert_valid_enum(namespace, + follower, + path, + ) + _valid_type(follower) + else: + path = '{}.{}.{}'.format(namespace, normalize_family(family.name), variable.name) + _convert_variable(variable, + 'variable', + ) + _convert_valid_enum(namespace, + variable, + path, + ) + _valid_type(variable) + + def convert_auto_freeze(self): # pylint: disable=C0111 + def _convert_auto_freeze(variable, namespace): + if variable.auto_freeze: + new_condition = self.objectspace.condition(variable.xmlfiles) + new_condition.name = 'auto_hidden_if_not_in' + new_condition.namespace = namespace + new_condition.source = FREEZE_AUTOFREEZE_VARIABLE + new_param = self.objectspace.param(variable.xmlfiles) + new_param.text = 'oui' + new_condition.param = [new_param] + new_target = self.objectspace.target(variable.xmlfiles) + new_target.type = 'variable' + path = variable.namespace + '.' + normalize_family(family.name) + '.' + variable.name + new_target.name = path + new_condition.target = [new_target] + if not hasattr(self.objectspace.space.constraints, 'condition'): + self.objectspace.space.constraints.condition = [] + self.objectspace.space.constraints.condition.append(new_condition) + if hasattr(self.objectspace.space, 'variables'): + for variables in self.objectspace.space.variables.values(): + if hasattr(variables, 'family'): + namespace = variables.name + for family in variables.family.values(): + if hasattr(family, 'variable'): + for variable in family.variable.values(): + if isinstance(variable, self.objectspace.leadership): + for follower in variable.variable: + _convert_auto_freeze(follower, namespace) + else: + _convert_auto_freeze(variable, namespace) + + def convert_separators(self): # pylint: disable=C0111,R0201 + if not hasattr(self.objectspace.space, 'variables'): + return + for family in self.objectspace.space.variables.values(): + if not hasattr(family, 'separators'): + continue + if hasattr(family.separators, 'separator'): + for idx, separator in enumerate(family.separators.separator): + option = self.objectspace.paths.get_variable_obj(separator.name) + if hasattr(option, 'separator'): + subpath = self.objectspace.paths.get_variable_path(separator.name, + separator.namespace, + ) + xmlfiles = self.objectspace.display_xmlfiles(separator.xmlfiles) + raise DictConsistencyError(_(f'{subpath} already has a separator in {xmlfiles}')) + option.separator = separator.text + del family.separators +