from .i18n import _ from .utils import normalize_family from .error import OperationError, DictConsistencyError from .config import variable_namespace class Path: """Helper class to handle the `path` attribute of a CreoleObjSpace instance. sample: path="creole.general.condition" """ def __init__(self): self.variables = {} self.families = {} self.full_paths = {} # Family def add_family(self, namespace: str, name: str, variableobj: str, ) -> str: # pylint: disable=C0111 if '.' not in name and namespace == variable_namespace: full_name = '.'.join([namespace, name]) self.full_paths[name] = full_name else: full_name = name if full_name in self.families and self.families[full_name]['variableobj'] != variableobj: raise DictConsistencyError(_(f'Duplicate family name {name}')) self.families[full_name] = dict(name=name, namespace=namespace, variableobj=variableobj, ) def get_family_path(self, name: str, current_namespace: str, ) -> str: # pylint: disable=C0111 name = normalize_family(name, check_name=False, allow_dot=True, ) if '.' not in name and current_namespace == variable_namespace and name in self.full_paths: name = self.full_paths[name] if current_namespace is None: # pragma: no cover raise OperationError('current_namespace must not be None') dico = self.families[name] if dico['namespace'] != variable_namespace and current_namespace != dico['namespace']: raise DictConsistencyError(_('A family located in the {} namespace ' 'shall not be used in the {} namespace').format( dico['namespace'], current_namespace)) return dico['name'] def get_family_obj(self, name: str, ) -> 'Family': # pylint: disable=C0111 if '.' not in name and name in self.full_paths: name = self.full_paths[name] if name not in self.families: raise DictConsistencyError(_('unknown family {}').format(name)) dico = self.families[name] return dico['variableobj'] def family_is_defined(self, name: str, ) -> str: # pylint: disable=C0111 if '.' not in name and name not in self.families and name in self.full_paths: return True return name in self.families # Leadership def set_leader(self, namespace: str, leader_family_name: str, name: str, leader_name: str, ) -> None: # pylint: disable=C0111 # need rebuild path and move object in new path old_path = namespace + '.' + leader_family_name + '.' + name dico = self._get_variable(old_path) del self.variables[old_path] new_path = namespace + '.' + leader_family_name + '.' + leader_name + '.' + name self.add_variable(namespace, new_path, dico['family'], False, dico['variableobj'], ) if namespace == variable_namespace: self.full_paths[name] = new_path else: name = new_path dico = self._get_variable(name) if dico['leader'] != None: raise DictConsistencyError(_('Already defined leader {} for variable' ' {}'.format(dico['leader'], name))) dico['leader'] = leader_name def get_leader(self, name): # pylint: disable=C0111 return self._get_variable(name)['leader'] # Variable def add_variable(self, namespace: str, name: str, family: str, is_dynamic: bool, variableobj, ) -> str: # pylint: disable=C0111 if '.' not in name: full_name = '.'.join([namespace, family, name]) self.full_paths[name] = full_name else: full_name = name if namespace == variable_namespace: name = name.rsplit('.', 1)[1] self.variables[full_name] = dict(name=name, family=family, namespace=namespace, leader=None, is_dynamic=is_dynamic, variableobj=variableobj) def get_variable_name(self, name, ): # pylint: disable=C0111 return self._get_variable(name)['name'] def get_variable_obj(self, name:str, ) -> 'Variable': # pylint: disable=C0111 return self._get_variable(name)['variableobj'] def get_variable_family_name(self, name: str, ) -> str: # pylint: disable=C0111 return self._get_variable(name)['family'] def get_variable_namespace(self, name: str, ) -> str: # pylint: disable=C0111 return self._get_variable(name)['namespace'] def get_variable_path(self, name: str, current_namespace: str, allow_source: str=False, with_suffix: bool=False, ) -> str: # pylint: disable=C0111 if current_namespace is None: # pragma: no cover raise OperationError('current_namespace must not be None') if with_suffix: dico, suffix = self._get_variable(name, with_suffix=True, ) else: dico = self._get_variable(name) if not allow_source: if dico['namespace'] not in [variable_namespace, 'services'] and current_namespace != dico['namespace']: raise DictConsistencyError(_('A variable located in the {} namespace ' 'shall not be used in the {} namespace').format( dico['namespace'], current_namespace)) if '.' in dico['name']: value = dico['name'] else: list_path = [dico['namespace'], dico['family']] if dico['leader'] is not None: list_path.append(dico['leader']) list_path.append(dico['name']) value = '.'.join(list_path) if with_suffix: return value, suffix return value def path_is_defined(self, name: str, ) -> str: # pylint: disable=C0111 if '.' not in name and name not in self.variables and name in self.full_paths: return True return name in self.variables def _get_variable(self, name: str, with_suffix: bool=False, ) -> str: if name not in self.variables: if name not in self.variables: if '.' not in name and name in self.full_paths: name = self.full_paths[name] if name not in self.variables: for var_name, variable in self.variables.items(): if variable['is_dynamic'] and name.startswith(var_name): if not with_suffix: raise Exception('This option is dynamic, should use "with_suffix" attribute') return variable, name[len(var_name):] if '.' not in name: for var_name, path in self.full_paths.items(): if name.startswith(var_name): variable = self.variables[self.full_paths[var_name]] if variable['is_dynamic']: if not with_suffix: raise Exception('This option is dynamic, should use "with_suffix" attribute') return variable, name[len(var_name):] raise DictConsistencyError(_('unknown option {}').format(name)) if with_suffix: return self.variables[name], None return self.variables[name]