rougail/src/rougail/annotator/service.py

248 lines
10 KiB
Python

"""Annotate services
Created by:
EOLE (http://eole.orion.education.fr)
Copyright (C) 2005-2018
Forked by:
Cadoles (http://www.cadoles.com)
Copyright (C) 2019-2021
distribued with GPL-2 or later license
This program is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation; either version 2 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
"""
from os.path import basename
from typing import Tuple
from ..i18n import _
from ..utils import normalize_family
from ..error import DictConsistencyError
# a CreoleObjSpace's attribute has some annotations
# that shall not be present in the exported (flatened) XML
ERASED_ATTRIBUTES = ('redefine', 'exists', 'optional', 'remove_check', 'namespace',
'remove_condition', 'path', 'instance_mode', 'index',
'level', 'remove_fill', 'xmlfiles', 'type', 'reflector_name',
'reflector_object',)
class ServiceAnnotator:
"""Manage service's object
for example::
<services>
<service name="test">
<service_access service='ntp'>
<port protocol='udp' service_accesslist='ntp_udp'>123</port>
</service_access>
</service>
</services>
"""
def __init__(self, objectspace):
self.objectspace = objectspace
if hasattr(self.objectspace.space, 'services'):
if not hasattr(self.objectspace.space.services, 'service'):
del self.objectspace.space.services
else:
self.convert_services()
def convert_services(self):
"""convert services to variables
"""
self.objectspace.space.services.hidden = True
self.objectspace.space.services.name = 'services'
self.objectspace.space.services.doc = 'services'
self.objectspace.space.services.path = 'services'
for service_name, service in self.objectspace.space.services.service.items():
service.information = self.objectspace.information(service.xmlfiles)
for elttype, values in dict(vars(service)).items():
if not isinstance(values, (dict, list)) or elttype in ERASED_ATTRIBUTES:
continue
eltname = elttype + 's'
path = '.'.join(['services', normalize_family(service_name), eltname])
family = self._gen_family(eltname,
path,
service.xmlfiles,
)
if isinstance(values, dict):
values = list(values.values())
family.family = self.make_group_from_elts(service_name,
elttype,
values,
path,
)
setattr(service, elttype, family)
service.doc = service.name
def make_group_from_elts(self,
service_name,
elttype,
elts,
path,
):
"""Splits each objects into a group (and `OptionDescription`, in tiramisu terms)
and build elements and its attributes (the `Options` in tiramisu terms)
"""
families = []
listname = '{}list'.format(elttype)
for elt in elts:
# try to launch _update_xxxx() function
update_elt = '_update_' + elttype
if hasattr(self, update_elt):
getattr(self, update_elt)(elt,
service_name,
)
c_name, subpath = self._get_name_path(elt,
path,
)
family = self._gen_family(c_name,
subpath,
elt.xmlfiles,
)
family.variable = []
activate_obj = self._generate_element('boolean',
None,
None,
'activate',
True,
elt,
'.'.join([subpath, 'activate']),
)
for key in dir(elt):
if key.startswith('_') or key.endswith('_type') or key in ERASED_ATTRIBUTES:
continue
value = getattr(elt, key)
if key == listname:
self.objectspace.list_conditions.setdefault(listname,
{}).setdefault(
value,
[]).append(activate_obj)
continue
if key == 'name':
dtd_key_type = elttype + '_type'
else:
dtd_key_type = key + '_type'
elt_type = getattr(elt, dtd_key_type, 'string')
if elt_type == 'variable':
elt_type = 'symlink'
family.variable.append(self._generate_element(elt_type,
dtd_key_type,
elttype,
key,
value,
elt,
f'{subpath}.{key}'
))
family.variable.append(activate_obj)
families.append(family)
return families
def _get_name_path(self,
elt,
path: str,
) -> Tuple[str, str]:
# create element name, if already exists, add _xx to be uniq
if hasattr(elt, 'source'):
name = elt.source
else:
name = elt.name
idx = 0
while True:
c_name = name
if idx:
c_name += f'_{idx}'
subpath = '{}.{}'.format(path, normalize_family(c_name))
try:
self.objectspace.paths.get_family(subpath, 'services')
except DictConsistencyError as err:
if err.errno == 42:
return c_name, subpath
idx += 1
def _gen_family(self,
name,
path,
xmlfiles
):
family = self.objectspace.family(xmlfiles)
family.name = normalize_family(name)
family.doc = name
family.mode = None
self.objectspace.paths.add_family('services',
path,
family,
None,
)
return family
def _generate_element(self,
type_,
dtd_key_type,
elttype,
key,
value,
elt,
path,
): # pylint: disable=R0913
variable = self.objectspace.variable(elt.xmlfiles)
variable.name = normalize_family(key)
variable.mode = None
variable.type = type_
if type_ == 'symlink':
variable.opt = self.objectspace.paths.get_variable(value)
variable.multi = None
if self.objectspace.types[dtd_key_type] != 'variable' and \
variable.opt.type != self.objectspace.types[dtd_key_type]:
msg = _(f'"{key}" in "{elttype}" must be a variable with type '
f'"{self.objectspace.types[dtd_key_type]}" not "{variable.opt.type}"')
raise DictConsistencyError(msg, 58, elt.xmlfiles)
else:
variable.doc = key
variable.default = value
variable.namespace = 'services'
self.objectspace.paths.add_variable('services',
path,
'service',
False,
variable,
)
return variable
def _update_override(self,
file_,
service_name,
):
file_.name = f'/systemd/system/{service_name}.service.d/rougail.conf'
# retrieve default value from File object
for attr in ['owner', 'group', 'mode']:
setattr(file_, attr, getattr(self.objectspace.file, attr))
if not hasattr(file_, 'source'):
file_.source = f'{service_name}.service'
self._update_file(file_,
service_name,
)
def _update_file(self,
file_,
service_name,
):
if not hasattr(file_, 'file_type') or file_.file_type == "filename":
if not hasattr(file_, 'source'):
file_.source = basename(file_.name)
elif not hasattr(file_, 'source'):
msg = _(f'attribute "source" is mandatory for the file "{file_.name}" '
f'"({service_name})"')
raise DictConsistencyError(msg, 34, file_.xmlfiles)