rougail/src/rougail/annotator/service.py

321 lines
14 KiB
Python

"""Annotate services
Created by:
EOLE (http://eole.orion.education.fr)
Copyright (C) 2005-2018
Forked by:
Cadoles (http://www.cadoles.com)
Copyright (C) 2019-2021
distribued with GPL-2 or later license
This program is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation; either version 2 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
"""
from os.path import basename
from typing import Tuple
from rougail.i18n import _
from rougail.utils import normalize_family
from rougail.error import DictConsistencyError
# a object's attribute has some annotations
# that shall not be present in the exported (flatened) XML
ERASED_ATTRIBUTES = ('redefine', 'namespace', 'xmlfiles', 'disabled', 'name', 'manage')
ERASED_ATTRIBUTES2 = ('redefine', 'namespace', 'xmlfiles', 'disabled')
ALLOW_ATTRIBUT_NOT_MANAGE = ['file', 'engine', 'target']
class Annotator:
"""Manage service's object
for example::
<services>
<service name="test">
<service_access service='ntp'>
</service_access>
</service>
</services>
"""
level = 20
def __init__(self,
objectspace,
*args,
) -> None:
self.objectspace = objectspace
self.uniq_overrides = []
if 'network_type' not in self.objectspace.types:
self.objectspace.types['network_type'] = self.objectspace.types['ip_type']
if hasattr(self.objectspace.space, 'services'):
if not hasattr(self.objectspace.space.services, 'service'):
del self.objectspace.space.services
else:
self.convert_services()
def convert_services(self):
"""convert services to variables
"""
self.objectspace.space.services.hidden = True
self.objectspace.space.services.name = 'services'
self.objectspace.space.services.doc = 'services'
self.objectspace.space.services.path = 'services'
for service_name, service in self.objectspace.space.services.service.items():
service.name = normalize_family(service_name)
activate_obj = self._generate_element('boolean',
None,
None,
'activate',
not service.disabled,
service,
'.'.join(['services', service.name, 'activate']),
)
service.disabled = None
for elttype, values in dict(vars(service)).items():
if elttype == 'servicelist':
self.objectspace.list_conditions.setdefault('servicelist',
{}).setdefault(
values,
[]).append(activate_obj)
continue
if elttype in ERASED_ATTRIBUTES:
continue
if not service.manage and elttype not in ALLOW_ATTRIBUT_NOT_MANAGE:
msg = _(f'unmanage service cannot have "{elttype}"')
raise DictConsistencyError(msg, 66, service.xmlfiles)
if isinstance(values, (dict, list)):
if elttype != 'ip':
eltname = elttype + 's'
else:
eltname = elttype
path = '.'.join(['services', service.name, eltname])
family = self._gen_family(eltname,
path,
service.xmlfiles,
with_informations=False,
)
if isinstance(values, dict):
values = list(values.values())
family.family = self.make_group_from_elts(service_name,
elttype,
values,
path,
)
setattr(service, elttype, family)
else:
if not hasattr(service, 'information'):
service.information = self.objectspace.information(service.xmlfiles)
setattr(service.information, elttype, values)
service.path = '.'.join(['services', service.name])
manage = self._generate_element('boolean',
None,
None,
'manage',
service.manage,
service,
'.'.join(['services', service.name, 'manage']),
)
service.variable = [activate_obj, manage]
service.doc = service_name
def make_group_from_elts(self,
service_name,
elttype,
elts,
path,
):
"""Splits each objects into a group (and `OptionDescription`, in tiramisu terms)
and build elements and its attributes (the `Options` in tiramisu terms)
"""
families = []
listname = '{}list'.format(elttype)
for elt in elts:
# try to launch _update_xxxx() function
update_elt = '_update_' + elttype
if hasattr(self, update_elt):
getattr(self, update_elt)(elt,
service_name,
)
c_name, subpath = self._get_name_path(elt,
path,
)
family = self._gen_family(c_name,
subpath,
elt.xmlfiles,
)
family.variable = []
if hasattr(elt, 'disabled'):
disabled = elt.disabled
else:
disabled = False
activate_obj = self._generate_element('boolean',
None,
None,
'activate',
not disabled,
elt,
'.'.join([subpath, 'activate']),
)
for key in dir(elt):
if key.startswith('_') or key.endswith('_type') or key in ERASED_ATTRIBUTES2:
continue
value = getattr(elt, key)
if key == listname:
self.objectspace.list_conditions.setdefault(listname,
{}).setdefault(
value,
[]).append(activate_obj)
continue
if key == 'name':
dtd_key_type = elttype + '_type'
else:
dtd_key_type = key + '_type'
elt_type = getattr(elt, dtd_key_type, None)
if elt_type:
if elt_type == 'variable':
elt_type = 'symlink'
family.variable.append(self._generate_element(elt_type,
dtd_key_type,
elttype,
key,
value,
elt,
f'{subpath}.{key}'
))
else:
setattr(family.information, key, value)
family.variable.append(activate_obj)
families.append(family)
return families
def _get_name_path(self,
elt,
path: str,
) -> Tuple[str, str]:
# create element name, if already exists, add _xx to be uniq
if hasattr(elt, 'source') and elt.source:
name = elt.source
else:
name = elt.name
idx = 0
while True:
c_name = name
if idx:
c_name += f'_{idx}'
subpath = '{}.{}'.format(path, normalize_family(c_name))
try:
self.objectspace.paths.get_family(subpath, 'services')
except DictConsistencyError as err:
if err.errno == 42:
return c_name, subpath
idx += 1
def _gen_family(self,
name,
path,
xmlfiles,
with_informations=True,
):
family = self.objectspace.family(xmlfiles)
family.name = normalize_family(name)
family.doc = name
family.mode = None
self.objectspace.paths.add_family('services',
path,
family,
None,
)
if with_informations:
family.information = self.objectspace.information(xmlfiles)
return family
def _generate_element(self,
type_,
dtd_key_type,
elttype,
key,
value,
elt,
path,
): # pylint: disable=R0913
variable = self.objectspace.variable(elt.xmlfiles)
variable.name = normalize_family(key)
variable.mode = None
variable.type = type_
if type_ == 'symlink':
variable.opt = self.objectspace.paths.get_variable(value)
variable.multi = None
needed_type = self.objectspace.types[dtd_key_type]
if needed_type not in ('variable', variable.opt.type):
msg = _(f'"{value}" in "{elttype}" must be a variable with type '
f'"{needed_type}" not "{variable.opt.type}"')
raise DictConsistencyError(msg, 58, elt.xmlfiles)
else:
variable.doc = key
variable.default = value
variable.namespace = 'services'
self.objectspace.paths.add_variable('services',
path,
'service',
False,
variable,
)
return variable
def _update_override(self,
override,
service_name,
):
if service_name in self.uniq_overrides:
msg = _('only one override is allowed by service, '
'please use a variable multiple if you want have more than one IP')
raise DictConsistencyError(msg, 69, override.xmlfiles)
self.uniq_overrides.append(service_name)
override.name = service_name
if not hasattr(override, 'source'):
override.source = f'{service_name}.service'
@staticmethod
def _update_file(file_,
service_name,
):
if not hasattr(file_, 'file_type') or file_.file_type == "filename":
if not hasattr(file_, 'source'):
file_.source = basename(file_.name)
elif not hasattr(file_, 'source'):
msg = _(f'attribute "source" is mandatory for the file "{file_.name}" '
f'"({service_name})"')
raise DictConsistencyError(msg, 34, file_.xmlfiles)
def _update_ip(self,
ip,
service_name,
) -> None:
variable = self.objectspace.paths.get_variable(ip.name, ip.xmlfiles)
if variable.type not in ['ip', 'network', 'network_cidr']:
msg = _(f'ip cannot be linked to "{variable.type}" variable "{ip.name}"')
raise DictConsistencyError(msg, 70, ip.xmlfiles)
if variable.type in ['ip', 'network_cidr'] and hasattr(ip, 'netmask'):
msg = _(f'ip with ip_type "{variable.type}" must not have netmask')
raise DictConsistencyError(msg, 59, ip.xmlfiles)
if variable.type == 'network' and not hasattr(ip, 'netmask'):
msg = _(f'ip with ip_type "{variable.type}" must have netmask')
raise DictConsistencyError(msg, 64, ip.xmlfiles)
if hasattr(ip, 'netmask'):
netmask = self.objectspace.paths.get_variable(ip.netmask, ip.xmlfiles)
if netmask.type != 'netmask':
msg = _(f'netmask in ip must have type "netmask", not "{netmask.type}"')
raise DictConsistencyError(msg, 65, ip.xmlfiles)