222 lines
9.1 KiB
Python
222 lines
9.1 KiB
Python
"""Manage path to find objects
|
|
|
|
Created by:
|
|
EOLE (http://eole.orion.education.fr)
|
|
Copyright (C) 2005-2018
|
|
|
|
Forked by:
|
|
Cadoles (http://www.cadoles.com)
|
|
Copyright (C) 2019-2021
|
|
|
|
distribued with GPL-2 or later license
|
|
|
|
This program is free software; you can redistribute it and/or modify
|
|
it under the terms of the GNU General Public License as published by
|
|
the Free Software Foundation; either version 2 of the License, or
|
|
(at your option) any later version.
|
|
|
|
This program is distributed in the hope that it will be useful,
|
|
but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
GNU General Public License for more details.
|
|
|
|
You should have received a copy of the GNU General Public License
|
|
along with this program; if not, write to the Free Software
|
|
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
|
|
"""
|
|
from typing import List
|
|
from .i18n import _
|
|
from .error import DictConsistencyError
|
|
from .utils import normalize_family
|
|
|
|
|
|
class Path:
|
|
"""Helper class to handle the `path` attribute.
|
|
|
|
sample: path="creole.general.condition"
|
|
"""
|
|
def __init__(self,
|
|
rougailconfig: 'RougailConfig',
|
|
) -> None:
|
|
self.variables = {}
|
|
self.families = {}
|
|
self.full_paths_families = {}
|
|
self.full_paths_variables = {}
|
|
self.variable_namespace = rougailconfig['variable_namespace']
|
|
|
|
# Family
|
|
def add_family(self,
|
|
namespace: str,
|
|
name: str,
|
|
variableobj: str,
|
|
subpath: str,
|
|
) -> str: # pylint: disable=C0111
|
|
"""Add a new family
|
|
"""
|
|
if namespace == self.variable_namespace:
|
|
full_name = '.'.join([subpath, name])
|
|
if name in self.full_paths_families:
|
|
msg = _(f'Duplicate family name "{name}"')
|
|
raise DictConsistencyError(msg, 55, variableobj.xmlfiles)
|
|
self.full_paths_families[name] = full_name
|
|
else:
|
|
if '.' not in name: # pragma: no cover
|
|
msg = _(f'Variable "{name}" in namespace "{namespace}" must have dot')
|
|
raise DictConsistencyError(msg, 39, variableobj.xmlfiles)
|
|
full_name = name
|
|
if full_name in self.families and \
|
|
self.families[full_name]['variableobj'] != variableobj: # pragma: no cover
|
|
msg = _(f'Duplicate family name "{name}"')
|
|
raise DictConsistencyError(msg, 37, variableobj.xmlfiles)
|
|
if full_name in self.variables:
|
|
msg = _(f'A variable and a family has the same path "{full_name}"')
|
|
raise DictConsistencyError(msg, 56, variableobj.xmlfiles)
|
|
self.families[full_name] = dict(name=name,
|
|
namespace=namespace,
|
|
variableobj=variableobj,
|
|
)
|
|
variableobj.path = full_name
|
|
|
|
def get_family(self,
|
|
name: str,
|
|
current_namespace: str,
|
|
) -> 'Family': # pylint: disable=C0111
|
|
"""Get a family
|
|
"""
|
|
name = '.'.join([normalize_family(subname) for subname in name.split('.')])
|
|
if name not in self.families and name in self.full_paths_families:
|
|
name = self.full_paths_families[name]
|
|
if name not in self.families:
|
|
raise DictConsistencyError(_(f'unknown option {name}'), 42, [])
|
|
dico = self.families[name]
|
|
if current_namespace not in [self.variable_namespace, 'services'] and \
|
|
current_namespace != dico['namespace']:
|
|
msg = _(f'A family located in the "{dico["namespace"]}" namespace '
|
|
f'shall not be used in the "{current_namespace}" namespace')
|
|
raise DictConsistencyError(msg, 38, [])
|
|
return dico['variableobj']
|
|
|
|
def is_leader(self, path): # pylint: disable=C0111
|
|
"""Is the variable is a leader
|
|
"""
|
|
variable = self._get_variable(path)
|
|
if not variable['leader']:
|
|
return False
|
|
leadership = self.get_family(variable['leader'], variable['variableobj'].namespace)
|
|
return next(iter(leadership.variable.values())).path == path
|
|
|
|
def is_follower(self, path):
|
|
"""Is the variable is a follower
|
|
"""
|
|
variable = self._get_variable(path)
|
|
if not variable['leader']:
|
|
return False
|
|
leadership = self.get_family(variable['leader'], variable['variableobj'].namespace)
|
|
return next(iter(leadership.variable.values())).path != path
|
|
|
|
# Variable
|
|
def add_variable(self, # pylint: disable=R0913
|
|
namespace: str,
|
|
name: str,
|
|
family: str,
|
|
is_dynamic: bool,
|
|
variableobj,
|
|
leader: 'self.objectspace.family'=None,
|
|
) -> str: # pylint: disable=C0111
|
|
"""Add a new variable (with path)
|
|
"""
|
|
if name == 'hostname_' and not is_dynamic:
|
|
raise Exception('pffff')
|
|
if '.' not in name:
|
|
full_path = '.'.join([family, name])
|
|
if namespace == self.variable_namespace:
|
|
self.full_paths_variables[name] = full_path
|
|
else:
|
|
full_path = name
|
|
variableobj.path = full_path
|
|
if full_path in self.families:
|
|
msg = _(f'A family and a variable has the same path "{full_path}"')
|
|
raise DictConsistencyError(msg, 57, variableobj.xmlfiles)
|
|
self.variables[full_path] = dict(name=name,
|
|
family=family,
|
|
leader=leader,
|
|
is_dynamic=is_dynamic,
|
|
variableobj=variableobj,
|
|
)
|
|
|
|
def get_variable(self,
|
|
name: str,
|
|
xmlfiles: List[str]=[],
|
|
) -> 'Variable': # pylint: disable=C0111
|
|
"""Get variable object from a path
|
|
"""
|
|
variable, suffix = self._get_variable(name, with_suffix=True, xmlfiles=xmlfiles)
|
|
if suffix:
|
|
raise DictConsistencyError(_(f"{name} is a dynamic variable"), 36, [])
|
|
return variable['variableobj']
|
|
|
|
def get_variable_family_path(self,
|
|
name: str,
|
|
xmlfiles: List[str]=False,
|
|
) -> str: # pylint: disable=C0111
|
|
"""Get the full path of a family
|
|
"""
|
|
return self._get_variable(name, xmlfiles=xmlfiles)['family']
|
|
|
|
def get_variable_path(self,
|
|
name: str,
|
|
current_namespace: str,
|
|
xmlfiles: List[str]=[],
|
|
) -> str: # pylint: disable=C0111
|
|
"""get full path of a variable
|
|
"""
|
|
dico, suffix = self._get_variable(name,
|
|
with_suffix=True,
|
|
xmlfiles=xmlfiles,
|
|
)
|
|
namespace = dico['variableobj'].namespace
|
|
if namespace not in [self.variable_namespace, 'services'] and \
|
|
current_namespace != 'services' and \
|
|
current_namespace != namespace:
|
|
msg = _(f'A variable located in the "{namespace}" namespace shall not be used '
|
|
f'in the "{current_namespace}" namespace')
|
|
raise DictConsistencyError(msg, 41, xmlfiles)
|
|
return dico['variableobj'].path, suffix
|
|
|
|
def path_is_defined(self,
|
|
path: str,
|
|
) -> str: # pylint: disable=C0111
|
|
"""The path is a valid path
|
|
"""
|
|
if '.' not in path and path not in self.variables and path in self.full_paths_variables:
|
|
return True
|
|
return path in self.variables
|
|
|
|
def variable_is_dynamic(self,
|
|
name: str,
|
|
) -> bool:
|
|
"""This variable is in dynamic family
|
|
"""
|
|
return self._get_variable(name)['is_dynamic']
|
|
|
|
def _get_variable(self,
|
|
name: str,
|
|
with_suffix: bool=False,
|
|
xmlfiles: List[str]=[],
|
|
) -> str:
|
|
name = '.'.join([normalize_family(subname) for subname in name.split('.')])
|
|
if name not in self.variables:
|
|
if '.' not in name and name in self.full_paths_variables:
|
|
name = self.full_paths_variables[name]
|
|
elif with_suffix:
|
|
for var_name, full_path in self.full_paths_variables.items():
|
|
if name.startswith(var_name):
|
|
variable = self._get_variable(full_path)
|
|
if variable['is_dynamic']:
|
|
return variable, name[len(var_name):]
|
|
if name not in self.variables:
|
|
raise DictConsistencyError(_(f'unknown option "{name}"'), 42, xmlfiles)
|
|
if with_suffix:
|
|
return self.variables[name], None
|
|
return self.variables[name]
|